From 0eeb62a8844db4178da77e90c8ee49ad4e34405c Mon Sep 17 00:00:00 2001
From: erthalion <9erthalion6@gmail.com>
Date: Mon, 11 Feb 2019 16:06:13 +0100
Subject: [PATCH v11] libpq compression buffers

libpq compression patch with the changed buffering approach.
---
 configure                           |  82 ++++++
 doc/src/sgml/libpq.sgml             |  14 +
 doc/src/sgml/protocol.sgml          |  85 ++++++
 src/Makefile.global.in              |   1 +
 src/backend/Makefile                |   8 +
 src/backend/libpq/be-secure.c       |  72 ++++-
 src/backend/libpq/pqcomm.c          | 137 ++++++----
 src/backend/postmaster/postmaster.c |  10 +
 src/common/Makefile                 |   2 +-
 src/common/zpq_stream.c             | 516 ++++++++++++++++++++++++++++++++++++
 src/include/common/zpq_stream.h     |  44 +++
 src/include/libpq/libpq-be.h        |   2 +
 src/include/libpq/libpq.h           |   6 +-
 src/include/pg_config.h.in          |   3 +
 src/interfaces/libpq/Makefile       |  13 +
 src/interfaces/libpq/fe-connect.c   |  54 +++-
 src/interfaces/libpq/fe-misc.c      |  32 ++-
 src/interfaces/libpq/fe-protocol3.c |  83 ++++++
 src/interfaces/libpq/fe-secure.c    |  70 ++++-
 src/interfaces/libpq/libpq-int.h    |   5 +
 src/tools/msvc/Mkvcbuild.pm         |   2 +-
 21 files changed, 1158 insertions(+), 83 deletions(-)
 create mode 100644 src/common/zpq_stream.c
 create mode 100644 src/include/common/zpq_stream.h

diff --git a/configure b/configure
index ddb3c8b1ba..77df8974b9 100755
--- a/configure
+++ b/configure
@@ -701,6 +701,7 @@ ELF_SYS
 EGREP
 GREP
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 with_libxml
@@ -863,6 +864,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 enable_float4_byval
@@ -8299,6 +8301,86 @@ fi
 
 
 
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
 #
 # Elf
 #
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index c1d1b6b2db..a77adc3a0d 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1135,6 +1135,20 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library.
+        If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
+        visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
+        message and it is up to the client whether to continue work without compression or report error.
+        Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+        configured with --with-zstd option). In both cases streaming mode is used.
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d66b860cbd..1b6f863a03 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+    Compression is especial useful for importing/exporting data to/from database using COPY command
+    and for replication (both physical and logical). Also compression can reduce server response time
+    in case of queries returning large amount of data (for example returning JSON, BLOBs, text,...)
+    Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+    configured with --with-zstd option). In both cases streaming mode is used.
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -262,6 +271,20 @@
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         Server acknowledge using compression for client-server communication protocol.
+         Compression can be requested by client by including "compression" option in connection string.
+         Client sends to the server list of compression algorithms, supported by client library.
+         If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
+         visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
+         message and it is up to the client whether to continue work without compression or report error.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
@@ -3398,6 +3421,56 @@ AuthenticationSASLFinal (B)
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+  Acknowledge use of compression for protocol data. Client sends to the server list of compression algorithms, supported by client library.
+  If server supports one of this algorithms, then it responds with CompressionAck with identifier (letter) of first such algorithm.
+  If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression) algorithm.
+  It is up to the client whether to continue work without compression or report error.
+  After receiving this message with algorithm identifer aother than 'n', both server and client are switched to compression mode
+  and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte1
+</term>
+<listitem>
+<para>
+        Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib, 'n' - no compresion.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 
 <varlistentry>
 <term>
@@ -5815,6 +5888,18 @@ StartupMessage (F)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+                <literal>compression</literal>
+</term>
+<listitem>
+<para>
+                        Request compression of libpq traffic. Value is list of compression algorithms supported by client:
+                        <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal> - zlib, <literal>'n'</literal> - no compresion.
+                        By default compression is disabled.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
                 In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index a84b2f96eb..3dd8072fdf 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 478a96db9b..38eb1a5a69 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -51,6 +51,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index a7def3168d..340d84d226 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -143,25 +143,49 @@ secure_close(Port *port)
  *	Read data from a secure connection.
  */
 ssize_t
-secure_read(Port *port, void *ptr, size_t len)
+secure_read(Port *port, ZpqStream *zs, void *ptr, size_t len)
 {
-	ssize_t		n;
+	ssize_t		n = 0;
 	int			waitfor;
+	void *buf;
+	ssize_t buf_len;
 
 	/* Deal with any already-pending interrupt condition. */
 	ProcessClientReadInterrupt(false);
 
 retry:
+#ifdef USE_COMPRESSION
+	if (zs)
+	{
+		n = zpq_read_drain(zs, ptr, len);
+		if (n == 0)
+		{
+			buf = zpq_buffer(zs, ZPQ_READ_BUFFER);
+			buf_len = zpq_buffer_size(zs, ZPQ_READ_BUFFER);
+		}
+		else
+			return n;
+	}
+	else
+	{
+		buf = ptr;
+		buf_len = len;
+	}
+#else
+	buf = ptr;
+	buf_len = len;
+#endif
+
 #ifdef USE_SSL
 	waitfor = 0;
 	if (port->ssl_in_use)
 	{
-		n = be_tls_read(port, ptr, len, &waitfor);
+		n = be_tls_read(port, buf, buf_len, &waitfor);
 	}
 	else
 #endif
 	{
-		n = secure_raw_read(port, ptr, len);
+		n = secure_raw_read(port, buf, buf_len);
 		waitfor = WL_SOCKET_READABLE;
 	}
 
@@ -214,6 +238,13 @@ retry:
 		goto retry;
 	}
 
+#ifdef USE_COMPRESSION
+	if (zs && n > 0)
+	{
+		n = zpq_read(zs, ptr, len, buf, n);
+	}
+#endif
+
 	/*
 	 * Process interrupts that happened during a successful (or non-blocking,
 	 * or hard-failed) read.
@@ -248,25 +279,45 @@ secure_raw_read(Port *port, void *ptr, size_t len)
  *	Write data to a secure connection.
  */
 ssize_t
-secure_write(Port *port, void *ptr, size_t len)
+secure_write(Port *port, ZpqStream *zs, void *ptr, size_t len)
 {
 	ssize_t		n;
 	int			waitfor;
+	void *buf;
+	ssize_t buf_len;
 
 	/* Deal with any already-pending interrupt condition. */
 	ProcessClientWriteInterrupt(false);
 
+#ifdef USE_COMPRESSION
+	if (zs)
+	{
+		buf = zpq_buffer(zs, ZPQ_WRITE_BUFFER);
+		buf_len = zpq_buffer_size(zs, ZPQ_WRITE_BUFFER);
+
+		buf_len = zpq_write(zs, ptr, len, buf, buf_len);
+	}
+	else
+	{
+		buf = ptr;
+		buf_len = len;
+	}
+#else
+	buf = ptr;
+	buf_len = len;
+#endif
+
 retry:
 	waitfor = 0;
 #ifdef USE_SSL
 	if (port->ssl_in_use)
 	{
-		n = be_tls_write(port, ptr, len, &waitfor);
+		n = be_tls_write(port, buf, buf_len, &waitfor);
 	}
 	else
 #endif
 	{
-		n = secure_raw_write(port, ptr, len);
+		n = secure_raw_write(port, buf, buf_len);
 		waitfor = WL_SOCKET_WRITEABLE;
 	}
 
@@ -308,7 +359,12 @@ retry:
 	 */
 	ProcessClientWriteInterrupt(false);
 
-	return n;
+	/*
+	 * compressed could be bigger, maybe because this from zlib:
+	 * destLen is the total size of the destination buffer,
+	 * which must be at least 0.1% larger than sourceLen plus 12 bytes.
+	 */
+	return (n == buf_len) ? len : n;
 }
 
 ssize_t
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index c39617a430..f139ceaffc 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -95,6 +95,7 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include  "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -143,6 +144,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream* PqStream;
+
+
 /*
  * Message status
  */
@@ -185,6 +189,50 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+	char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
+	char* client_compression_algorithms = port->compression_algorithms;
+	char compression_algorithm = ZPQ_NO_COMPRESSION;
+	char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
+	int rc;
+
+	zpq_get_supported_algorithms(server_compression_algorithms);
+
+	if (client_compression_algorithms)
+	{
+		while (*client_compression_algorithms != '\0')
+		{
+			if (strchr(server_compression_algorithms, *client_compression_algorithms))
+			{
+				compression_algorithm = *client_compression_algorithms;
+				break;
+			}
+			client_compression_algorithms += 1;
+		}
+	}
+
+	compression[5] = compression_algorithm;
+	/* Switch on compression at client side */
+	socket_set_nonblocking(false);
+	while ((rc = secure_write(MyProcPort, PqStream, compression, sizeof(compression))) < 0
+		   && errno == EINTR);
+	if ((size_t)rc != sizeof(compression))
+		return -1;
+
+	/* initialize compression */
+	if (zpq_set_algorithm(compression_algorithm))
+		PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -282,6 +330,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -932,12 +983,14 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			   r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -953,21 +1006,31 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
-
+		r = secure_read(MyProcPort, PqStream, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				char const* msg = zpq_error(PqStream);
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -988,7 +1051,7 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		return r;
 	}
 }
 
@@ -1003,7 +1066,7 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1022,7 +1085,7 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1043,44 +1106,11 @@ pq_getbyte_if_available(unsigned char *c)
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1101,7 +1131,7 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1135,7 +1165,7 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1176,7 +1206,7 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1426,13 +1456,13 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
 	{
-		int			r;
-
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+		int		r;
+		size_t  available = bufend - bufptr;
+		r = secure_write(MyProcPort, PqStream, bufptr, available);
 
-		if (r <= 0)
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1480,7 +1510,6 @@ internal_flush(void)
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 40a0222220..6cd7a8fdf2 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2050,6 +2050,8 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "compression") == 0)
+				port->compression_algorithms = pstrdup(valptr);
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4293,6 +4295,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/common/Makefile b/src/common/Makefile
index d0c2b970eb..47cfae5107 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -48,7 +48,7 @@ OBJS_COMMON = base64.o config_info.o controldata_utils.o exec.o file_perm.o \
 	ip.o keywords.o kwlookup.o link-canary.o md5.o pg_lzcompress.o \
 	pgfnames.o psprintf.o relpath.o \
 	rmtree.o saslprep.o scram-common.o string.o unicode_norm.o \
-	username.o wait_error.o
+	username.o wait_error.o zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += sha2_openssl.o
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000000..71abf8ae4c
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,516 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+static int zpq_algorithm_impl;
+
+typedef struct
+{
+	char    (*name)(void);
+	ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg);
+	ssize_t (*read)(ZpqStream *zs, void *buf, size_t size,
+								   void *source, size_t source_size);
+	ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size,
+									void *target, size_t target_size);
+	void    (*free)(ZpqStream *zs);
+	char const* (*error)(ZpqStream *zs);
+	size_t  (*buffered)(ZpqStream *zs);
+	void*  	(*buffer)(ZpqStream *zs, int type);
+	size_t  (*buffer_size)(ZpqStream *zs, int type);
+	size_t  (*read_drain)(ZpqStream *zs, void *ptr, size_t len);
+} ZpqAlgorithm;
+
+
+#if HAVE_LIBZSTD
+
+#include <malloc.h>
+#include <zstd.h>
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
+
+typedef struct ZstdStream
+{
+	ZSTD_CStream*  tx_stream;
+	ZSTD_DStream*  rx_stream;
+	ZSTD_outBuffer tx;
+	ZSTD_inBuffer  rx;
+	size_t         tx_not_flushed; /* Amount of datas in internal zstd buffer */
+	size_t         tx_buffered;    /* Data which is consumed by zpq_read but not yet sent */
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+	char const*    rx_error;    /* Decompress error message */
+	size_t         tx_total;
+	size_t         tx_total_raw;
+	size_t         rx_total;
+	size_t         rx_total_raw;
+	char           tx_buf[ZSTD_BUFFER_SIZE];
+	char           rx_buf[ZSTD_BUFFER_SIZE];
+} ZstdStream;
+
+static ZpqStream*
+zstd_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+	ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream));
+	zs->tx_stream = ZSTD_createCStream();
+	ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL);
+	zs->rx_stream = ZSTD_createDStream();
+	ZSTD_initDStream(zs->rx_stream);
+	zs->tx.dst = zs->tx_buf;
+	zs->tx.pos = 0;
+	zs->tx.size = ZSTD_BUFFER_SIZE;
+	zs->rx.src = zs->rx_buf;
+	zs->rx.pos = 0;
+	zs->rx.size = 0;
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->tx_buffered = 0;
+	zs->tx_not_flushed = 0;
+	zs->rx_error = NULL;
+	zs->arg = arg;
+	zs->tx_total = zs->tx_total_raw = 0;
+	zs->rx_total = zs->rx_total_raw = 0;
+	return (ZpqStream*)zs;
+}
+
+static ssize_t
+zstd_read(ZpqStream *zstream, void *buf, size_t size,
+							  void *source, size_t source_size)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	ssize_t rc;
+	ZSTD_outBuffer out;
+	out.dst = buf;
+	out.pos = 0;
+	out.size = size;
+
+	while (1)
+	{
+		rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+		if (ZSTD_isError(rc))
+		{
+			zs->rx_error = ZSTD_getErrorName(rc);
+			return ZPQ_DECOMPRESS_ERROR;
+		}
+		/* Return result if we fill requested amount of bytes or read operation was performed */
+		if (out.pos != 0)
+		{
+			zs->rx_total_raw += out.pos;
+			return out.pos;
+		}
+		if (zs->rx.pos == zs->rx.size)
+		{
+			zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+		}
+		rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size);
+		if (rc > 0) /* read fetches some data */
+		{
+			zs->rx.size += rc;
+			zs->rx_total += rc;
+		}
+		else /* read failed */
+		{
+			zs->rx_total_raw += out.pos;
+			return rc;
+		}
+	}
+}
+
+static ssize_t
+zstd_write(ZpqStream *zstream, void const *buf, size_t size,
+							   void *target, size_t target_size)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	ssize_t rc;
+	ZSTD_inBuffer in_buf;
+	in_buf.src = buf;
+	in_buf.pos = 0;
+	in_buf.size = size;
+
+	do
+	{
+		if (zs->tx.pos == 0) /* Compress buffer is empty */
+		{
+			zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+			if (in_buf.pos < size) /* Has something to compress in input buffer */
+				ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+			if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+			{
+				zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+		if (rc > 0)
+		{
+			zs->tx.pos -= rc;
+			zs->tx.dst = (char*)zs->tx.dst + rc;
+			zs->tx_total += rc;
+		}
+		else
+		{
+			zs->tx_buffered = zs->tx.pos;
+			zs->tx_total_raw += in_buf.pos;
+			return rc;
+		}
+	} while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
+
+	zs->tx_total_raw += in_buf.pos;
+	zs->tx_buffered = zs->tx.pos;
+	return in_buf.pos;
+}
+
+static void
+zstd_free(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	if (zs != NULL)
+	{
+		ZSTD_freeCStream(zs->tx_stream);
+		ZSTD_freeDStream(zs->rx_stream);
+		free(zs);
+	}
+}
+
+static char const*
+zstd_error(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	return zs->rx_error;
+}
+
+static size_t
+zstd_buffered(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+static void*
+zstd_buffer(ZpqStream *zstream, int type)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+
+	if (type == ZPQ_READ_BUFFER)
+		return &zs->rx_buff;
+	else if (type == ZPQ_WRITE_BUFFER)
+		return &zs->tx_buff;
+	else
+		return NULL;
+}
+
+static size_t
+zstd_buffer_size(ZpqStream *zstream, int type)
+{
+	return ZSTD_BUFFER_SIZE;
+}
+
+static size_t
+zstd_read_drain(ZpqStream *zstream, void *ptr, size_t len)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	ssize_t		n = 0;
+
+	if (zs->rx.avail_in != 0)
+		return zpq_read(zstream, ptr, len, zs->rx.next_in, n);
+	else
+		return 0;
+}
+
+static char
+zstd_name(void)
+{
+	return 'f';
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <malloc.h>
+#include <zlib.h>
+
+#define ZLIB_BUFFER_SIZE 8192
+#define ZLIB_COMPRESSION_LEVEL 1
+
+typedef struct ZlibStream
+{
+	z_stream tx;
+	z_stream rx;
+
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+
+	size_t         tx_buffered;
+
+	Bytef          tx_buf[ZLIB_BUFFER_SIZE];
+	Bytef          rx_buf[ZLIB_BUFFER_SIZE];
+} ZlibStream;
+
+static ZpqStream*
+zlib_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+	int rc;
+	ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream));
+	memset(&zs->tx, 0, sizeof(zs->tx));
+	zs->tx.next_out = zs->tx_buf;
+	zs->tx.avail_out = ZLIB_BUFFER_SIZE;
+	zs->tx_buffered = 0;
+	rc = deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL);
+	if (rc != Z_OK)
+	{
+		free(zs);
+		return NULL;
+	}
+	Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE);
+
+	memset(&zs->rx, 0, sizeof(zs->tx));
+	zs->rx.next_in = zs->rx_buf;
+	zs->rx.avail_in = ZLIB_BUFFER_SIZE;
+	rc = inflateInit(&zs->rx);
+	if (rc != Z_OK)
+	{
+		free(zs);
+		return NULL;
+	}
+	Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE);
+	zs->rx.avail_in = 0;
+
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->arg = arg;
+
+	return (ZpqStream*)zs;
+}
+
+static ssize_t
+zlib_read(ZpqStream *zstream, void *buf, size_t size, void *source, size_t source_size)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	int rc = 0;
+	zs->rx.next_out = buf;
+	zs->rx.avail_out = size;
+
+	if (source_size > 0)
+	{
+		zs->rx.next_in = zs->rx_buf;
+		zs->rx.avail_in = source_size;
+	}
+
+	while (1)
+	{
+		if (zs->rx.avail_in > 0) /* If there is some data in receiver buffer, then decompress it */
+		{
+			/* Handle Z_BUF_ERROR for no progress? */
+			rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+			if (rc != Z_OK)
+			{
+				return ZPQ_DECOMPRESS_ERROR;
+			}
+			if (zs->rx.avail_out != size)
+			{
+				return size - zs->rx.avail_out;
+			}
+		}
+	}
+}
+
+static ssize_t
+zlib_write(ZpqStream *zstream, void const *buf, size_t size, void *target, size_t target_size)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+    int rc;
+	zs->tx.next_in = (Bytef *)buf;
+	zs->tx.avail_in = size;
+	zs->tx.next_out = target;
+	zs->tx.avail_out = target_size;
+
+	/* repeat sending data until first partial write */
+	do
+	{
+		if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */
+		{
+			if (zs->tx.avail_in != 0) /* Has something in input buffer */
+			{
+				/* Handle Z_BUF_ERROR for no progress? */
+				rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+				Assert(rc == Z_OK);
+			}
+		}
+	} while (zs->tx.avail_out == ZLIB_BUFFER_SIZE && zs->tx.avail_in != 0);
+
+	zs->tx_buffered = 0;
+
+	return ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+}
+
+static void
+zlib_free(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	if (zs != NULL)
+	{
+		inflateEnd(&zs->rx);
+		deflateEnd(&zs->tx);
+		free(zs);
+	}
+}
+
+static char const*
+zlib_error(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	return zs->rx.msg;
+}
+
+static size_t
+zlib_buffered(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	return zs != NULL ? zs->tx_buffered : 0;
+}
+
+static void*
+zlib_buffer(ZpqStream *zstream, int type)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+
+	if (type == ZPQ_READ_BUFFER)
+		return &zs->rx_buf;
+	else if (type == ZPQ_WRITE_BUFFER)
+		return &zs->tx_buf;
+	else
+		return NULL;
+}
+
+static size_t
+zlib_buffer_size(ZpqStream *zstream, int type)
+{
+	return ZLIB_BUFFER_SIZE;
+}
+
+static size_t
+zlib_read_drain(ZpqStream *zstream, void *ptr, size_t len)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	ssize_t		n = 0;
+
+	if (zs->rx.avail_in != 0)
+		return zpq_read(zstream, ptr, len, zs->rx.next_in, n);
+	else
+		return 0;
+}
+
+static char
+zlib_name(void)
+{
+	return 'z';
+}
+
+#endif
+
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered, zstd_buffer, zstd_buffer_size, zstd_read_drain},
+#endif
+#if HAVE_LIBZ
+	{zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered, zlib_buffer, zlib_buffer_size, zlib_read_drain},
+#endif
+	{NULL}
+};
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+	return zpq_algorithms[zpq_algorithm_impl].create(tx_func, rx_func, arg);
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size,
+						void *source, size_t source_size)
+{
+	return zpq_algorithms[zpq_algorithm_impl].read(zs, buf, size,
+													   source, source_size);
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size,
+						 void *target, size_t target_size)
+{
+	return zpq_algorithms[zpq_algorithm_impl].write(zs, buf, size,
+														target, target_size);
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+	zpq_algorithms[zpq_algorithm_impl].free(zs);
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+	return zpq_algorithms[zpq_algorithm_impl].error(zs);
+}
+
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+	return zpq_algorithms[zpq_algorithm_impl].buffered(zs);
+}
+
+void*
+zpq_buffer(ZpqStream *zs, int type)
+{
+	return zpq_algorithms[zpq_algorithm_impl].buffer(zs, type);
+}
+
+size_t
+zpq_buffer_size(ZpqStream *zs, int type)
+{
+	return zpq_algorithms[zpq_algorithm_impl].buffer_size(zs, type);
+}
+
+size_t
+zpq_read_drain(ZpqStream *zs, void *ptr, size_t len)
+{
+	return zpq_algorithms[zpq_algorithm_impl].read_drain(zs, ptr, len);
+}
+
+void
+zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
+{
+	int i;
+	for (i = 0; zpq_algorithms[i].name != NULL; i++)
+	{
+		Assert(i < ZPQ_MAX_ALGORITHMS);
+		algorithms[i] = zpq_algorithms[i].name();
+	}
+	Assert(i < ZPQ_MAX_ALGORITHMS);
+	algorithms[i] = '\0';
+}
+
+
+bool
+zpq_set_algorithm(char name)
+{
+	int i;
+	if (name != ZPQ_NO_COMPRESSION)
+	{
+		for (i = 0; zpq_algorithms[i].name != NULL; i++)
+		{
+			if (zpq_algorithms[i].name() == name)
+			{
+				zpq_algorithm_impl = i;
+				return true;
+			}
+		}
+	}
+	return false;
+}
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000000..676c26f07b
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,44 @@
+/*
+ * zpq_stream.h
+ *     Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+#define ZPQ_MAX_ALGORITHMS (8)
+#define ZPQ_NO_COMPRESSION 'n'
+
+#define ZPQ_READ_BUFFER 0
+#define ZPQ_WRITE_BUFFER 1
+
+#if HAVE_LIBZ || HAVE_LIBZSTD
+#define USE_COMPRESSION
+#endif
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size,
+								void *source, size_t source_size);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size,
+								 void *target, size_t target_size);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered(ZpqStream* zs);
+void* zpq_buffer(ZpqStream* zs, int type);
+size_t zpq_buffer_size(ZpqStream* zs, int type);
+size_t zpq_read_drain(ZpqStream* zs, void *ptr, size_t len);
+void zpq_free(ZpqStream* zs);
+
+void zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]);
+bool zpq_set_algorithm(char name);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 248055f10b..f74460cd9d 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -164,6 +164,8 @@ typedef struct Port
 	int			keepalives_interval;
 	int			keepalives_count;
 
+	char*       compression_algorithms; /* Compression algorithms supported by client */
+
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 
 	/*
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 755819cc58..223223184e 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -19,6 +19,7 @@
 #include "lib/stringinfo.h"
 #include "libpq/libpq-be.h"
 #include "storage/latch.h"
+#include  "common/zpq_stream.h"
 
 
 typedef struct
@@ -61,6 +62,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int  pq_configure(Port* port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
@@ -89,8 +91,8 @@ extern bool secure_loaded_verify_locations(void);
 extern void secure_destroy(void);
 extern int	secure_open_server(Port *port);
 extern void secure_close(Port *port);
-extern ssize_t secure_read(Port *port, void *ptr, size_t len);
-extern ssize_t secure_write(Port *port, void *ptr, size_t len);
+extern ssize_t secure_read(Port *port, ZpqStream *zs, void *ptr, size_t len);
+extern ssize_t secure_write(Port *port, ZpqStream *zs, void *ptr, size_t len);
 extern ssize_t secure_raw_read(Port *port, void *ptr, size_t len);
 extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len);
 
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 82547f321f..14ed5a3e0c 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -377,6 +377,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 025542dfe9..42c3287472 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -27,6 +27,19 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
 OBJS=	fe-auth.o fe-auth-scram.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \
 	fe-protocol2.o fe-protocol3.o pqexpbuffer.o fe-secure.o \
 	libpq-events.o
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index f29202db5f..c93dd57694 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -73,6 +73,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "mb/pg_wchar.h"
 #include "port/pg_bswap.h"
 
@@ -320,6 +321,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", "COMPRESSION", NULL, NULL,
+	    "Libpq-compression", "", 1,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -426,6 +431,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2859,11 +2868,46 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						char algorithm;
+						/* Read message length word */
+						if (pqGetInt(&msgLength, 4, conn))
+						{
+							/* We'll come back when there is more data */
+							return PGRES_POLLING_READING;
+						}
+						if (msgLength != 5)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "expected compression algorithm specification message length is 5 bytes, but %d is recevied\n"),
+											  msgLength);
+							goto error_return;
+						}
+						pqGetc(&algorithm, conn);
+						if (!zpq_set_algorithm(algorithm))
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server is not supported requested compression algorithm\n"));
+							goto error_return;
+						}
+						/* mark byte consumed */
+						conn->inStart = conn->inCursor;
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn);
+					} else
+						break;
 				}
 
 				/*
@@ -3664,6 +3708,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index e5ef8d44bd..21c38894ba 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,13 +53,15 @@
 #include "port/pg_bswap.h"
 #include "pg_config_paths.h"
 
+#include  <common/zpq_stream.h>
 
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
-static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
-			  time_t end_time);
+static int  pqSocketCheck(PGconn *conn, int forRead, int forWrite,
+						  time_t end_time);
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
+
 /*
  * PQlibVersion: return the libpq version number
  */
@@ -679,9 +681,18 @@ pqReadData(PGconn *conn)
 	/* OK, try to read some data */
 retry3:
 	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+								conn->inBufSize - conn->inEnd);
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry3;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -769,9 +780,18 @@ retry3:
 	 */
 retry4:
 	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+								conn->inBufSize - conn->inEnd);
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry4;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -845,11 +865,9 @@ pqSendSome(PGconn *conn, int len)
 	while (len > 0)
 	{
 		int			sent;
-
 #ifndef WIN32
 		sent = pqsecure_write(conn, ptr, len);
 #else
-
 		/*
 		 * Windows can fail on large sends, per KB article Q201213. The
 		 * failure-point appears to be different in different versions of
@@ -896,7 +914,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0)
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 47dbc3186f..2f53f518f3 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2135,6 +2135,79 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
 	return startpacket;
 }
 
+static bool
+parse_bool(const char *value, bool *result)
+{
+	switch (*value)
+	{
+		case 't':
+		case 'T':
+			if (pg_strcasecmp(value, "true") == 0)
+			{
+				*result = true;
+				return true;
+			}
+			break;
+		case 'f':
+		case 'F':
+			if (pg_strcasecmp(value, "false") == 0)
+			{
+				*result = false;
+				return true;
+			}
+			break;
+		case 'y':
+		case 'Y':
+			if (pg_strcasecmp(value, "yes") == 0)
+			{
+				*result = true;
+				return true;
+			}
+			break;
+		case 'n':
+		case 'N':
+			if (pg_strcasecmp(value, "no") == 0)
+			{
+				*result = false;
+				return true;
+			}
+			break;
+		case 'o':
+		case 'O':
+			/* 'o' is not unique enough */
+			if (pg_strcasecmp(value, "on") == 0)
+			{
+				*result = true;
+				return true;
+			}
+			else if (pg_strcasecmp(value, "off") == 0)
+			{
+				*result = false;
+				return true;
+			}
+			break;
+		case '1':
+			if (value[1] == '\0')
+			{
+				*result = true;
+				return true;
+			}
+			break;
+		case '0':
+			if (value[1] == '\0')
+			{
+				*result = false;
+				return true;
+			}
+			break;
+		default:
+			break;
+	}
+
+	*result = false;		/* suppress compiler warning */
+	return false;
+}
+
 /*
  * Build a startup packet given a filled-in PGconn structure.
  *
@@ -2181,6 +2254,16 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("replication", conn->replication);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
+	if (conn->compression && conn->compression[0])
+	{
+		bool enabled;
+		if (parse_bool(conn->compression, &enabled))
+		{
+			char compression_algorithms[ZPQ_MAX_ALGORITHMS];
+			zpq_get_supported_algorithms(compression_algorithms);
+			ADD_STARTUP_OPTION("compression", compression_algorithms);
+		}
+	}
 	if (conn->send_appname)
 	{
 		/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c
index 4658e27caa..5087ae13e2 100644
--- a/src/interfaces/libpq/fe-secure.c
+++ b/src/interfaces/libpq/fe-secure.c
@@ -212,18 +212,50 @@ pqsecure_close(PGconn *conn)
 ssize_t
 pqsecure_read(PGconn *conn, void *ptr, size_t len)
 {
-	ssize_t		n;
+	ssize_t		n = 0;
+	void *buf;
+	ssize_t buf_len;
+
+#ifdef USE_COMPRESSION
+	if (conn->zstream)
+	{
+		n = zpq_read_drain(conn->zstream, ptr, len);
+
+		if (n == 0)
+		{
+			buf = zpq_buffer(conn->zstream, ZPQ_READ_BUFFER);
+			buf_len = zpq_buffer_size(conn->zstream, ZPQ_READ_BUFFER);
+		}
+		else
+			return n;
+	}
+	else
+	{
+		buf = ptr;
+		buf_len = len;
+	}
+#else
+	buf = ptr;
+	buf_len = len;
+#endif
 
 #ifdef USE_SSL
 	if (conn->ssl_in_use)
 	{
-		n = pgtls_read(conn, ptr, len);
+		n = pgtls_read(conn, buf, buf_len);
 	}
 	else
 #endif
 	{
-		n = pqsecure_raw_read(conn, ptr, len);
+		n = pqsecure_raw_read(conn, buf, buf_len);
+	}
+
+#ifdef USE_COMPRESSION
+	if (conn->zstream && n > 0)
+	{
+		n = zpq_read(conn->zstream, ptr, len, buf, n);
 	}
+#endif
 
 	return n;
 }
@@ -290,21 +322,47 @@ ssize_t
 pqsecure_write(PGconn *conn, const void *ptr, size_t len)
 {
 	ssize_t		n;
+	ssize_t 	buf_len;
+	void 		*buf;
+
+#ifdef USE_COMPRESSION
+	if (conn->zstream)
+	{
+		buf = zpq_buffer(conn->zstream, ZPQ_WRITE_BUFFER);
+		buf_len = zpq_buffer_size(conn->zstream, ZPQ_WRITE_BUFFER);
+
+		buf_len = zpq_write(conn->zstream, ptr, len, buf, buf_len);
+	}
+	else
+	{
+		buf = ptr;
+		buf_len = len;
+	}
+#else
+	buf = ptr;
+	buf_len = len;
+#endif
 
 #ifdef USE_SSL
 	if (conn->ssl_in_use)
 	{
-		n = pgtls_write(conn, ptr, len);
+		n = pgtls_write(conn, buf, buf_len);
 	}
 	else
 #endif
 	{
-		n = pqsecure_raw_write(conn, ptr, len);
+		n = pqsecure_raw_write(conn, buf, buf_len);
 	}
 
-	return n;
+	/*
+	 * compressed could be bigger, maybe because this from zlib:
+	 * destLen is the total size of the destination buffer,
+	 * which must be at least 0.1% larger than sourceLen plus 12 bytes.
+	 */
+	return (n == buf_len) ? len : n;
 }
 
+
 ssize_t
 pqsecure_raw_write(PGconn *conn, const void *ptr, size_t len)
 {
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4a93d8edbc..b7c4f64421 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -358,6 +359,7 @@ struct pg_conn
 	char	   *sslrootcert;	/* root certificate filename */
 	char	   *sslcrl;			/* certificate revocation list filename */
 	char	   *requirepeer;	/* required peer credentials for local sockets */
+	char	   *compression;    /* stream compression (0 or 1) */
 
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 	char	   *krbsrvname;		/* Kerberos service name */
@@ -500,6 +502,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream* zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 56192f1b20..7830529cb2 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -119,7 +119,7 @@ sub mkvcbuild
 	our @pgcommonallfiles = qw(
 	  base64.c config_info.c controldata_utils.c exec.c file_perm.c ip.c
 	  keywords.c kwlookup.c link-canary.c md5.c
-	  pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+	  zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
 	  saslprep.c scram-common.c string.c unicode_norm.c username.c
 	  wait_error.c);
 
-- 
2.16.4

