On 24.11.2020 20:34, Daniil Zakhlystov wrote:
The following review has been posted through the commitfest application:
make installcheck-world: tested, failed
Implements feature: tested, passed
Spec compliant: tested, failed
Documentation: tested, failed
Submission review
--
Is the patch in a patch format which has context? (eg: context diff format)
NO, need to fix
Does it apply cleanly to the current git master?
YES
Does it include reasonable tests, necessary doc patches, etc?
have docs, missing tests
Usability review
--
At the moment, the patch supports per-connection (permanent) compression. The
frontend can specify the desired compression algorithms and compression levels
and then negotiate the compression algorithm that is going to be used with the
backend. In current state patch is missing the ability to enable/disable the
compression on the backend side, I think it might be not great from the
usability side.
Regarding on-the-fly configurable compression and different compression
algorithms for each direction - these two ideas are promising but tend to make
the implementation more complex. However, the current implementation can be
extended to support these approaches in the future. For example, we can specify
switchable on-the-fly compression as ‘switchable’ algorithm and negotiate it
like the regular compression algorithm (like we currently negotiate ‘zstd’ and
‘zlib’). ‘switchable’ algorithm may then introduce new specific messages to
Postgres protocol to make the on-the-fly compression magic work.
The same applies to Robert’s idea of the different compression algorithms for
different directions - we can introduce it later as a new compression algorithm
with new specific protocol messages.
Does the patch actually implement that?
YES
Do we want that?
YES
Do we already have it?
NO
Does it follow SQL spec, or the community-agreed behavior?
To be discussed
Does it include pg_dump support (if applicable)?
not applicable
Are there dangers?
theoretically possible CRIME-like attack when using with SSL enabled
Have all the bases been covered?
To be discussed
Feature test
--
I’ve applied the patch, compiled, and tested it with configure options
--enable-cassert and --enable-debug turned on. I’ve tested the following
scenarios:
1. make check
=======================
All 201 tests passed.
=======================
2. make check-world
initially failed with:
============== running regression test queries ==============
test postgres_fdw ... FAILED 4465 ms
============== shutting down postmaster ==============
======================
1 of 1 tests failed.
======================
The differences that caused some tests to fail can be viewed in the
file "/xxx/xxx/review/postgresql/contrib/postgres_fdw/regression.diffs". A copy of the
test summary that you see above is saved in the file
"/xxx/xxx/review/postgresql/contrib/postgres_fdw/regression.out".
All tests passed after replacing ‘gsslib, target_session_attrs’ with ‘gsslib,
compression, target_session_attrs’ in line 8914 of
postgresql/contrib/postgres_fdw/expected/postgres_fdw.out
3. simple psql utility usage
psql -d "host=xxx port=5432 dbname=xxx user=xxx compression=1"
4. pgbench tpcb-like w/ SSL turned ON
pgbench "host=xxx port=5432 dbname=xxx user=xxx sslmode=require compression=1"
--builtin tpcb-like -t 70 --jobs=32 --client=700
5. pgbench tpcb-like w/ SSL turned OFF
pgbench "host=xxx port=5432 dbname=xxx user=xxx sslmode=disable compression=1"
--builtin tpcb-like -t 70 --jobs=32 --client=700
6. pgbench initialization w/ SSL turned ON
pgbench "host=xxx port=5432 dbname=xxx user=xxx sslmode=require compression=1"
-i -s 500
7. pgbench initialization w/ SSL turned OFF
pgbench "host=xxx port=5432 dbname=xxx user=xxx sslmode=disable compression=1"
-i -s 500
8. Streaming physical replication. Recovery-related parameters
recovery_target_timeline = 'latest'
primary_conninfo = 'host=xxx port=5432 user=repl application_name=xxx
compression=1'
primary_slot_name = 'xxx'
restore_command = 'some command'
9. This compression has been implemented in an experimental build of odyssey
connection pooler and tested with ~1500 synthetic simultaneous clients
configuration and ~300 GB databases.
During the testing, I’ve reported and fixed some of the issues.
Does the feature work as advertised?
YES
Are there corner cases the author has failed to consider?
NO
Are there any assertion failures or crashes?
NO
Performance review
--
Does the patch slow down simple tests?
NO
If it claims to improve performance, does it?
YES
Does it slow down other things?
Using compression may add a CPU overhead. This mostly depends on compression
algorithm and chosen compression level. During testing with ZSTD algorithm and
compression level 1 there was about 10% of CPU overhead in read/write balanced
scenarios and almost no overhead in mostly read scenarios.
Coding review
--
In protocol.sgml:
It can be just boolean values enabling or disabling compression
("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "auto" or explicit
list of compression algorithms
separated by comma with optional specification of compression level:
"zlib,zstd:5".
But in fe-protocol3.c:
if (pg_strcasecmp(value, "true") == 0 ||
pg_strcasecmp(value, "yes") == 0 ||
pg_strcasecmp(value, "on") == 0 ||
pg_strcasecmp(value, "any") == 0 ||
pg_strcasecmp(value, "1") == 0)
{
I believe there is some mismatch - in docs, there is an “auto” parameter, but
in code “auto” is missing, but “any” exists. Actually, I propose to remove both
“auto” and “any” parameters because they work the same way as “true/on/yes/1”
but appear like something else.
In fe-protocol3.c:
#define pq_read_conn(conn) \
(conn->zstream \
? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
\
conn->inBufSize - conn->inEnd)
\
: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
\
conn->inBufSize - conn->inEnd))
I think there should be some comment regarding the read function choosing
logic. Same for zpq_write calls. Also, pq_read_conn is defined as a macros, but
there is no macros for pq_write_conn.
In configure.ac:
if test "$with_zstd" = yes; then
AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
[AC_MSG_ERROR([zstd library not found
If you have zstd already installed, see config.log for details on the
failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zstd to disable zstd support.])])
fi
if test "$with_zstd" = yes; then
AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
If you have zstd already installed, see config.log for details on the
failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zstd to disable zstd support.])])
fi
Looks like the rows with --without-zstd are incorrect.
In fe-connect.c:
if (index == (char)-1)
{
appendPQExpBuffer(&conn->errorMessage,
libpq_gettext(
"server is not supported requested compression algorithms %s\n"),
conn->compression);
goto error_return;
}
Right now this error might be displayed in two cases:
Backend support compression, but it is somehow disabled/turned off
Backend support compression, but does not support requested algorithms
I think that it is a good idea to differentiate these two cases. Maybe define
the following behavior somewhere in docs:
“When connecting to an older backend, which does not support compression, or in
the case when the backend support compression but for some reason wants to
disable it, the backend will just ignore the _pq_.compression parameter and
won’t send the compressionAck message to the frontend.”
To sum up, I think that the current implementation already introduces good
benefits. As I proposed in the Usability review, we may introduce the new
approaches later as separate compression 'algorithms'.
Thanks,
Daniil Zakhlystov
Thank you for review.
New version of the patch addressing reported issues is attached.
I added libpq_comression GUC to be able to prohibit compression and
server side if for some (security, high CPU load,..) reasons it is not
desired.
diff --git a/configure b/configure
index dd64692345..9f707733c3 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LD
LDFLAGS_SL
LDFLAGS_EX
with_zlib
+with_zstd
with_system_tzdata
with_libxslt
XML2_LIBS
@@ -867,6 +868,7 @@ with_libxml
with_libxslt
with_system_tzdata
with_zlib
+with_zstd
with_gnu_ld
enable_largefile
'
@@ -8571,6 +8573,85 @@ fi
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+ withval=$with_zstd;
+ case $withval in
+ yes)
+ ;;
+ no)
+ :
+ ;;
+ *)
+ as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+ ;;
+ esac
+
+else
+ with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+ { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_lib_zstd_ZSTD_compress=yes
+else
+ ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+ cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+ LIBS="-lzstd $LIBS"
+
+else
+ as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
#
# Zlib
diff --git a/configure.ac b/configure.ac
index 748fb50236..4e2435c49a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
[do not use Zlib])
AC_SUBST(with_zlib)
+#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+ [use zstd])
+AC_SUBST(with_zstd)
+
#
# Assignments
#
@@ -1186,6 +1193,13 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zlib to disable zlib support.])])
fi
+if test "$with_zstd" = yes; then
+ AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+ [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.])])
+fi
+
if test "$enable_spinlocks" = yes; then
AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
else
@@ -1400,6 +1414,13 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zlib to disable zlib support.])])
fi
+if test "$with_zstd" = yes; then
+ AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
if test "$with_gssapi" = yes ; then
AC_CHECK_HEADERS(gssapi/gssapi.h, [],
[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3795c57004..e0feb360a6 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -974,6 +974,24 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
+ <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+ <term><varname>libpq_compression</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>libpq_compression</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ When this parameter is <literal>on</literal> (default), the <productname>PostgreSQL</productname>
+ server can switch on compression of traffic between server and client if it is requested by client.
+ Client sends to the server list of compression algorithms supported by frontend library,
+ server chooses one which is supported by backend library and sends it in compression acknowledgement message
+ to the client. This option allows to reject compression request even if it is supported by server
+ (due to security, CPU consumption or whatever else reasons...).
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</sect2>
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9d4b6ab4a8..a8f6919efd 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1225,6 +1225,22 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
</listitem>
</varlistentry>
+ <varlistentry id="libpq-connect-compression" xreflabel="compression">
+ <term><literal>compression</literal></term>
+ <listitem>
+ <para>
+ Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
+ visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it rejects client request to use compression
+ and it is up to the client whether to continue work without compression or report error.
+ Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is used.
+ By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+ <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
<term><literal>client_encoding</literal></term>
<listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cee28889e1..d5fd532f95 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
such as <command>COPY</command>.
</para>
+ <para>
+ It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+ Compression is especially useful for importing/exporting data to/from database using COPY command
+ and for replication (both physical and logical). Also compression can reduce server response time
+ in case of queries returning large amount of data (for example returning JSON, BLOBs, text,...)
+ Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option).
+ </para>
+
<sect2 id="protocol-message-concepts">
<title>Messaging Overview</title>
@@ -262,6 +271,27 @@
</listitem>
</varlistentry>
+ <varlistentry>
+ <term>CompressionAck</term>
+ <listitem>
+ <para>
+ Server acknowledges using compression for client-server communication protocol.
+ Compression can be requested by client by including "compression" option in connection string.
+ It can be just boolean values enabling or disabling compression
+ ("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "auto" or explicit list of compression algorithms
+ separated by comma with optional specification of compression level: "zlib,zstd:5".
+ If compression algorithm is not explicitly specified the most efficient one supported both by
+ client and server is chosen. Client sends to the server list of compression algorithms,
+ supported by client library.
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and
+ all subsequent libpq messages send both from client to server and
+ visa versa will be compressed. Server selects most efficient algorithm among list specified by client and returns to the client
+ index of chosen algorithm in this list. If server is not supporting any of the suggested algorithms, then it replies with -1
+ and it is up to the client whether to continue work without compression or report error.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term>AuthenticationOk</term>
<listitem>
@@ -3398,6 +3428,57 @@ following:
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('z')
+</term>
+<listitem>
+<para>
+ Acknowledge use of compression for protocol data. Client sends to the server list of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpq messages send both from client to server and
+ visa versa will be compressed. Server selects most efficient algorithm among list specified by client and returns to the client
+ index of chosen algorithm in this list. If server is not supporting any of the suggested algorithms, then it replies with -1
+ and it is up to the client whether to continue work without compression or report error.
+ After receiving this message with algorithm index other than -1, both server and client are switched to compression mode
+ and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Byte1
+</term>
+<listitem>
+<para>
+ Index of algorithm in the list of supported algotihms specified by client or -1 if none of them is supported.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
<varlistentry>
<term>
@@ -5964,6 +6045,22 @@ StartupMessage (F)
</para>
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+ <literal>_pq_.compression</literal>
+</term>
+<listitem>
+<para>
+ Request compression of libpq traffic. Value is list of compression algorithms supported by client with optional
+ specification of compression level: <literal>"zlib,zstd:5"</literal>.
+ When connecting to an older backend, which does not support compression, or in case when the backend support compression
+ but for some reason wants to disable it, the backend will just ignore the _pq_.compression parameter and won’t send
+ the compressionAck message to the frontend.
+ By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+ <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
</variablelist>
In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9aac5..9e11599002 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm = @with_llvm@
with_system_tzdata = @with_system_tzdata@
with_uuid = @with_uuid@
with_zlib = @with_zlib@
+with_zstd = @with_zstd@
enable_rpath = @enable_rpath@
enable_nls = @enable_nls@
enable_debug = @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95848..f32a780b81 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
LIBS += -lsystemd
endif
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
##########################################################################
all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 2e4aa1c4b6..9d05ec28d2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
LEFT JOIN pg_database AS D ON (S.datid = D.oid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
+CREATE VIEW pg_stat_network_traffic AS
+ SELECT
+ S.pid,
+ S.rx_raw_bytes,
+ S.tx_raw_bytes,
+ S.rx_compressed_bytes,
+ S.tx_compressed_bytes
+ FROM pg_stat_get_activity(NULL) AS S;
+
CREATE VIEW pg_stat_replication AS
SELECT
S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index d7de962a04..fe4f014e67 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
#include <signal.h>
#include <fcntl.h>
#include <grp.h>
+#include <pgstat.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/socket.h>
@@ -93,6 +94,8 @@
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
/*
* Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -118,6 +121,7 @@
*/
int Unix_socket_permissions;
char *Unix_socket_group;
+bool libpq_compression;
/* Where the Unix socket files are (list of palloc'd strings) */
static List *sock_paths = NIL;
@@ -141,6 +145,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
+static ZpqStream* PqStream;
+
+
/*
* Message status
*/
@@ -183,6 +190,99 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
WaitEventSet *FeBeWaitSet;
+static ssize_t write_compressed(void* arg, void const* data, size_t size)
+{
+ ssize_t rc = secure_write((Port*)arg, (void*)data, size);
+ if (rc > 0)
+ pgstat_report_network_traffic(0, 0, 0, rc);
+ return rc;
+}
+
+static ssize_t read_compressed(void* arg, void* data, size_t size)
+{
+ ssize_t rc = secure_read((Port*)arg, data, size);
+ if (rc > 0)
+ pgstat_report_network_traffic(0, 0, rc, 0);
+ return rc;
+}
+
+
+/* --------------------------------
+ * pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+ char* client_compression_algorithms = port->compression_algorithms;
+
+ /*
+ * If client request compression, it sends list of supported compression algorithms separated by comma.
+ */
+ if (client_compression_algorithms && libpq_compression)
+ {
+ int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+ char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
+ int impl = -1;
+ int rc;
+ char** server_compression_algorithms = zpq_get_supported_algorithms();
+ int index = -1;
+
+ for (int i = 0; *client_compression_algorithms; i++)
+ {
+ char* sep = strchr(client_compression_algorithms, ',');
+ char* level;
+ if (sep != NULL)
+ *sep = '\0';
+
+ level = strchr(client_compression_algorithms, ':');
+ if (level != NULL)
+ {
+ *level = '\0'; /* compression level is ignored now */
+ if (sscanf("%d", level+1, &compression_level) != 1)
+ ereport(LOG,
+ (errmsg("Invalid compression level: %s", level+1)));
+ }
+ for (impl = 0; server_compression_algorithms[impl] != NULL; impl++)
+ {
+ if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0)
+ {
+ index = i;
+ goto SendCompressionAck;
+ }
+ }
+
+ if (sep != NULL)
+ client_compression_algorithms = sep+1;
+ else
+ break;
+ }
+ SendCompressionAck:
+ free(server_compression_algorithms);
+ compression[5] = (char)index;
+ /* Send 'z' message to the client with selected compression algorithm (or -1 if not found) */
+ socket_set_nonblocking(false);
+ while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0
+ && errno == EINTR);
+ if ((size_t)rc != sizeof(compression))
+ return -1;
+
+ if (index >= 0) /* Use compression */
+ {
+ PqStream = zpq_create(impl, compression_level, write_compressed, read_compressed, MyProcPort, NULL, 0);
+ if (!PqStream)
+ {
+ ereport(LOG,
+ (errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl])));
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
/* --------------------------------
* pq_init - initialize libpq at backend startup
@@ -280,6 +380,9 @@ socket_close(int code, Datum arg)
free(MyProcPort->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
+ /* Release compression streams */
+ zpq_free(PqStream);
+
/*
* Cleanly shut down SSL layer. Nowhere else does a postmaster child
* call this, so this is safe when interrupting BackendInitialize().
@@ -919,12 +1022,15 @@ socket_set_nonblocking(bool nonblocking)
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
*
- * returns 0 if OK, EOF if trouble
+ * nowait parameter toggles non-blocking mode.
+ * returns number of read bytes, EOF if trouble
* --------------------------------
*/
static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
{
+ int r;
+
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
@@ -940,21 +1046,36 @@ pq_recvbuf(void)
}
/* Ensure that we're in blocking mode */
- socket_set_nonblocking(false);
+ socket_set_nonblocking(nowait);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
- int r;
-
- r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ /* If streaming compression is enabled then use correspondent compression read function. */
+ r = PqStream
+ ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength)
+ : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0)
{
+ if (r == ZPQ_DECOMPRESS_ERROR)
+ {
+ char const* msg = zpq_error(PqStream);
+ if (msg == NULL)
+ msg = "end of stream";
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to decompress data: %s", msg)));
+ return EOF;
+ }
if (errno == EINTR)
continue; /* Ok if interrupted */
+ if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return 0;
+
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
@@ -975,7 +1096,8 @@ pq_recvbuf(void)
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
- return 0;
+ pgstat_report_network_traffic(r, 0, 0, 0);
+ return r;
}
}
@@ -990,7 +1112,7 @@ pq_getbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1009,7 +1131,7 @@ pq_peekbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1026,48 +1148,15 @@ pq_peekbyte(void)
int
pq_getbyte_if_available(unsigned char *c)
{
- int r;
+ int r = 0;
Assert(PqCommReadingMsg);
- if (PqRecvPointer < PqRecvLength)
+ if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
{
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
-
- /* Put the socket into non-blocking mode */
- socket_set_nonblocking(true);
-
- r = secure_read(MyProcPort, c, 1);
- if (r < 0)
- {
- /*
- * Ok if no data available without blocking or interrupted (though
- * EINTR really shouldn't happen with a non-blocking socket). Report
- * other errors.
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- r = 0;
- else
- {
- /*
- * Careful: an ereport() that tries to write to the client would
- * cause recursion to here, leading to stack overflow and core
- * dump! This message must go *only* to the postmaster log.
- */
- ereport(COMMERROR,
- (errcode_for_socket_access(),
- errmsg("could not receive data from client: %m")));
- r = EOF;
- }
- }
- else if (r == 0)
- {
- /* EOF detected */
- r = EOF;
- }
-
return r;
}
@@ -1088,7 +1177,7 @@ pq_getbytes(char *s, size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1122,7 +1211,7 @@ pq_discardbytes(size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1163,7 +1252,7 @@ pq_getstring(StringInfo s)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
@@ -1413,13 +1502,19 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
- while (bufptr < bufend)
+ while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+ /* has more data to flush or unsent data in internal compression buffer */
{
- int r;
-
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
- if (r <= 0)
+ int r;
+ size_t processed = 0;
+ size_t available = bufend - bufptr;
+ r = PqStream
+ ? zpq_write(PqStream, bufptr, available, &processed)
+ : secure_write(MyProcPort, bufptr, available);
+ bufptr += processed;
+ PqSendStart += processed;
+
+ if (r < 0 || (r == 0 && available))
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
@@ -1462,12 +1557,12 @@ internal_flush(void)
InterruptPending = 1;
return EOF;
}
+ pgstat_report_network_traffic(0, r, 0, 0);
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
PqSendStart += r;
}
-
PqSendStart = PqSendPointer = 0;
return 0;
}
@@ -1484,7 +1579,7 @@ socket_flush_if_writable(void)
int res;
/* Quick exit if nothing to do */
- if (PqSendPointer == PqSendStart)
+ if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
return 0;
/* No-op if reentrant call */
@@ -1507,7 +1602,7 @@ socket_flush_if_writable(void)
static bool
socket_is_send_pending(void)
{
- return (PqSendStart < PqSendPointer);
+ return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
}
/* --------------------------------
@@ -1985,3 +2080,15 @@ pq_settcpusertimeout(int timeout, Port *port)
return STATUS_OK;
}
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+ char const* algorithm_name = PqStream ? zpq_algorithm_name(PqStream) : NULL;
+ if (algorithm_name)
+ PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+ else
+ PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e76e627c6b..4714b5197b 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3115,6 +3115,9 @@ pgstat_bestart(void)
lbeentry.st_xact_start_timestamp = 0;
lbeentry.st_databaseid = MyDatabaseId;
+ lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+ lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
/* We have userid for client-backends, wal-sender and bgworker processes */
if (lbeentry.st_backendType == B_BACKEND
|| lbeentry.st_backendType == B_WAL_SENDER
@@ -3496,6 +3499,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+ volatile PgBackendStatus *beentry = MyBEEntry;
+
+ if (!pgstat_track_activities || !beentry)
+ return;
+
+ /*
+ * Update my status entry, following the protocol of bumping
+ * st_changecount before and after. We use a volatile pointer here to
+ * ensure the compiler doesn't try to get cute.
+ */
+ PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+ beentry->st_rx_raw_bytes += rx_raw_bytes;
+ beentry->st_tx_raw_bytes += tx_raw_bytes;
+ beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+ beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+ PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
/* ----------
* pgstat_read_current_status() -
*
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b7799ed1d2..330ddafcf7 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2136,6 +2136,8 @@ retry1:
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
+ else if (strcmp(nameptr, "_pq_.compression") == 0)
+ port->compression_algorithms = pstrdup(valptr);
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
@@ -4443,6 +4445,14 @@ BackendInitialize(Port *port)
if (status != STATUS_OK)
proc_exit(0);
+ if (pq_configure(port))
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to send compression message: %m")));
+ proc_exit(0);
+ }
+
/*
* Now that we have the user and database name, we can set the process
* title for ps. It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a210fc93b4..b93ac11443 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -567,7 +567,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
Datum
pg_stat_get_activity(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_ACTIVITY_COLS 30
+#define PG_STAT_GET_ACTIVITY_COLS 34
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -913,6 +913,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
values[28] = BoolGetDatum(false); /* GSS Encryption not in
* use */
}
+ values[30] = beentry->st_rx_raw_bytes;
+ values[31] = beentry->st_tx_raw_bytes;
+ values[32] = beentry->st_rx_compressed_bytes;
+ values[33] = beentry->st_tx_compressed_bytes;
}
else
{
@@ -941,6 +945,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
nulls[27] = true;
nulls[28] = true;
nulls[29] = true;
+ nulls[30] = true;
+ nulls[31] = true;
+ nulls[32] = true;
+ nulls[33] = true;
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1140,6 +1148,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
PG_RETURN_TIMESTAMPTZ(result);
}
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS 4
+ TupleDesc tupdesc;
+ Datum values[PG_STAT_NETWORK_TRAFFIC_COLS];
+ bool nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+ int32 beid = PG_GETARG_INT32(0);
+ PgBackendStatus *beentry;
+
+ if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+ PG_RETURN_NULL();
+ else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+ PG_RETURN_NULL();
+
+ /* Initialise values and NULL flags arrays */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ /* Initialise attributes information in the tuple descriptor */
+ tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+ INT8OID, -1, 0);
+ BlessTupleDesc(tupdesc);
+
+ /* Fill values and NULLs */
+ values[0] = beentry->st_rx_raw_bytes;
+ values[1] = beentry->st_tx_raw_bytes;
+ values[2] = beentry->st_rx_compressed_bytes;
+ values[3] = beentry->st_tx_compressed_bytes;
+
+ /* Returns the record as Datum */
+ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
Datum
pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bb34630e8e..83a3c1f3ea 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1289,6 +1289,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+ gettext_noop("Compress client-server traffic."),
+ NULL
+ },
+ &libpq_compression,
+ true,
+ NULL, NULL, NULL
+ },
+
{
{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
gettext_noop("Logs each checkpoint."),
diff --git a/src/common/Makefile b/src/common/Makefile
index 25c55bd642..bc6cba8ba9 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
unicode_norm.o \
username.o \
wait_error.o \
- wchar.o
+ wchar.o \
+ zpq_stream.o
ifeq ($(with_openssl),yes)
OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000000..459566228a
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,543 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+ /*
+ * Returns name of compression algorithm.
+ */
+ char const* (*name)(void);
+
+ /*
+ * Create compression stream with using rx/tx function for fetching/sending compressed data.
+ * level: compression level
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for receiving compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+ ZpqStream* (*create)(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size);
+
+ /*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
+ */
+ ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
+
+ /*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code returned by tx function.
+ * In the last case amount of written raw bytes is stored in *processed.
+ */
+ ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);
+
+ /*
+ * Free stream created by create function.
+ */
+ void (*free)(ZpqStream *zs);
+
+ /*
+ * Get error message.
+ */
+ char const* (*error)(ZpqStream *zs);
+
+ /*
+ * Returns amount of data in internal tx decompression buffer.
+ */
+ size_t (*buffered_tx)(ZpqStream *zs);
+
+ /*
+ * Returns amount of data in internal rx compression buffer.
+ */
+ size_t (*buffered_rx)(ZpqStream *zs);
+} ZpqAlgorithm;
+
+struct ZpqStream
+{
+ ZpqAlgorithm const* algorithm;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+
+typedef struct ZstdStream
+{
+ ZpqStream common;
+ ZSTD_CStream* tx_stream;
+ ZSTD_DStream* rx_stream;
+ ZSTD_outBuffer tx;
+ ZSTD_inBuffer rx;
+ size_t tx_not_flushed; /* Amount of data in internal zstd buffer */
+ size_t tx_buffered; /* Data which is consumed by ztd_write but not yet sent */
+ size_t rx_buffered; /* Data which is needed for ztd_read */
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+ char const* rx_error; /* Decompress error message */
+ size_t tx_total;
+ size_t tx_total_raw;
+ size_t rx_total;
+ size_t rx_total_raw;
+ char tx_buf[ZSTD_BUFFER_SIZE];
+ char rx_buf[ZSTD_BUFFER_SIZE];
+} ZstdStream;
+
+static ZpqStream*
+zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream));
+
+ zs->tx_stream = ZSTD_createCStream();
+ ZSTD_initCStream(zs->tx_stream, level);
+ zs->rx_stream = ZSTD_createDStream();
+ ZSTD_initDStream(zs->rx_stream);
+ zs->tx.dst = zs->tx_buf;
+ zs->tx.pos = 0;
+ zs->tx.size = ZSTD_BUFFER_SIZE;
+ zs->rx.src = zs->rx_buf;
+ zs->rx.pos = 0;
+ zs->rx.size = 0;
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->tx_buffered = 0;
+ zs->rx_buffered = 0;
+ zs->tx_not_flushed = 0;
+ zs->rx_error = NULL;
+ zs->arg = arg;
+ zs->tx_total = zs->tx_total_raw = 0;
+ zs->rx_total = zs->rx_total_raw = 0;
+ zs->rx.size = rx_data_size;
+ Assert(rx_data_size < ZSTD_BUFFER_SIZE);
+ memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+ return (ZpqStream*)zs;
+}
+
+static ssize_t
+zstd_read(ZpqStream *zstream, void *buf, size_t size)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ ssize_t rc;
+ ZSTD_outBuffer out;
+ out.dst = buf;
+ out.pos = 0;
+ out.size = size;
+
+ while (1)
+ {
+ if (zs->rx.pos != zs->rx.size || zs->rx_buffered == 0)
+ {
+ rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+ if (ZSTD_isError(rc))
+ {
+ zs->rx_error = ZSTD_getErrorName(rc);
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ /* Return result if we fill requested amount of bytes or read operation was performed */
+ if (out.pos != 0)
+ {
+ zs->rx_total_raw += out.pos;
+ zs->rx_buffered = 0;
+ return out.pos;
+ }
+ zs->rx_buffered = rc;
+ if (zs->rx.pos == zs->rx.size)
+ {
+ zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+ }
+ }
+ rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size);
+ if (rc > 0) /* read fetches some data */
+ {
+ zs->rx.size += rc;
+ zs->rx_total += rc;
+ }
+ else /* read failed */
+ {
+ zs->rx_total_raw += out.pos;
+ return rc;
+ }
+ }
+}
+
+static ssize_t
+zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ ssize_t rc;
+ ZSTD_inBuffer in_buf;
+ in_buf.src = buf;
+ in_buf.pos = 0;
+ in_buf.size = size;
+
+ do
+ {
+ if (zs->tx.pos == 0) /* Compress buffer is empty */
+ {
+ zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (in_buf.pos < size) /* Has something to compress in input buffer */
+ ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+ if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+ {
+ zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+ if (rc > 0)
+ {
+ zs->tx.pos -= rc;
+ zs->tx.dst = (char*)zs->tx.dst + rc;
+ zs->tx_total += rc;
+ }
+ else
+ {
+ *processed = in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ zs->tx_total_raw += in_buf.pos;
+ return rc;
+ }
+ /* repeat sending while there is some data in input or internal zstd buffer */
+ } while (in_buf.pos < size || zs->tx_not_flushed);
+
+ zs->tx_total_raw += in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ return in_buf.pos;
+}
+
+static void
+zstd_free(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ if (zs != NULL)
+ {
+ ZSTD_freeCStream(zs->tx_stream);
+ ZSTD_freeDStream(zs->rx_stream);
+ free(zs);
+ }
+}
+
+static char const*
+zstd_error(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs->rx_error;
+}
+
+static size_t
+zstd_buffered_tx(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+static size_t
+zstd_buffered_rx(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs != NULL ? zs->rx.size - zs->rx.pos : 0;
+}
+
+static char const*
+zstd_name(void)
+{
+ return "zstd";
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+#define ZLIB_BUFFER_SIZE 8192 /* We have to flush stream after each protocol command
+ * and command is mostly limited by record length,
+ * which in turn usually less than page size (except TOAST)
+ */
+
+typedef struct ZlibStream
+{
+ ZpqStream common;
+
+ z_stream tx;
+ z_stream rx;
+
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+ unsigned tx_deflate_pending;
+ size_t tx_buffered;
+
+ Bytef tx_buf[ZLIB_BUFFER_SIZE];
+ Bytef rx_buf[ZLIB_BUFFER_SIZE];
+} ZlibStream;
+
+static ZpqStream*
+zlib_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ int rc;
+ ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream));
+ memset(&zs->tx, 0, sizeof(zs->tx));
+ zs->tx.next_out = zs->tx_buf;
+ zs->tx.avail_out = ZLIB_BUFFER_SIZE;
+ zs->tx_buffered = 0;
+ rc = deflateInit(&zs->tx, level);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE);
+
+ memset(&zs->rx, 0, sizeof(zs->tx));
+ zs->rx.next_in = zs->rx_buf;
+ zs->rx.avail_in = ZLIB_BUFFER_SIZE;
+ zs->tx_deflate_pending = 0;
+ rc = inflateInit(&zs->rx);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE);
+
+ zs->rx.avail_in = rx_data_size;
+ Assert(rx_data_size < ZLIB_BUFFER_SIZE);
+ memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->arg = arg;
+
+ return (ZpqStream*)zs;
+}
+
+static ssize_t
+zlib_read(ZpqStream *zstream, void *buf, size_t size)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ int rc;
+ zs->rx.next_out = (Bytef *)buf;
+ zs->rx.avail_out = size;
+
+ while (1)
+ {
+ if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+ {
+ rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+ if (rc != Z_OK && rc != Z_BUF_ERROR)
+ {
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ if (zs->rx.avail_out != size)
+ {
+ return size - zs->rx.avail_out;
+ }
+ if (zs->rx.avail_in == 0)
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ }
+ else
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+ if (rc > 0)
+ {
+ zs->rx.avail_in += rc;
+ }
+ else
+ {
+ return rc;
+ }
+ }
+}
+
+static ssize_t
+zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ int rc;
+ zs->tx.next_in = (Bytef *)buf;
+ zs->tx.avail_in = size;
+ do
+ {
+ if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */
+ {
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 0)) /* Has something in input or deflate buffer */
+ {
+ rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+ Assert(rc == Z_OK);
+ deflatePending(&zs->tx, &zs->tx_deflate_pending, Z_NULL); /* check if any data left in deflate buffer */
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out);
+ if (rc > 0)
+ {
+ zs->tx.next_out += rc;
+ zs->tx.avail_out += rc;
+ }
+ else
+ {
+ *processed = size - zs->tx.avail_in;
+ zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+ return rc;
+ }
+ /* repeat sending while there is some data in input or deflate buffer */
+ } while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0);
+
+ zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+
+ return size - zs->tx.avail_in;
+}
+
+static void
+zlib_free(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ if (zs != NULL)
+ {
+ inflateEnd(&zs->rx);
+ deflateEnd(&zs->tx);
+ free(zs);
+ }
+}
+
+static char const*
+zlib_error(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs->rx.msg;
+}
+
+static size_t
+zlib_buffered_tx(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0;
+}
+
+static size_t
+zlib_buffered_rx(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs != NULL ? zs->rx.avail_in : 0;
+}
+
+static char const*
+zlib_name(void)
+{
+ return "zlib";
+}
+
+#endif
+
+static char const*
+no_compression_name(void)
+{
+ return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+ {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx},
+#endif
+#if HAVE_LIBZ
+ {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx},
+#endif
+ {no_compression_name}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream*
+zpq_create(int algorithm_impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ ZpqStream* stream = zpq_algorithms[algorithm_impl].create(level, tx_func, rx_func, arg, rx_data, rx_data_size);
+ if (stream)
+ stream->algorithm = &zpq_algorithms[algorithm_impl];
+ return stream;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size)
+{
+ return zs->algorithm->read(zs, buf, size);
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed)
+{
+ return zs->algorithm->write(zs, buf, size, processed);
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+ if (zs)
+ zs->algorithm->free(zs);
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+ return zs->algorithm->error(zs);
+}
+
+
+size_t
+zpq_buffered_rx(ZpqStream *zs)
+{
+ return zs ? zs->algorithm->buffered_rx(zs) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream *zs)
+{
+ return zs ? zs->algorithm->buffered_tx(zs) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ */
+char**
+zpq_get_supported_algorithms(void)
+{
+ size_t n_algorithms = sizeof(zpq_algorithms)/sizeof(*zpq_algorithms);
+ char** algorithm_names = malloc(n_algorithms*sizeof(char*));
+
+ for (size_t i = 0; i < n_algorithms; i++)
+ {
+ algorithm_names[i] = (char*)zpq_algorithms[i].name();
+ }
+
+ return algorithm_names;
+}
+
+char const*
+zpq_algorithm_name(ZpqStream *zs)
+{
+ return zs ? zs->algorithm->name() : NULL;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 33dacfd340..baa4d6d448 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5239,9 +5239,9 @@
proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'int4',
- proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+ proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
prosrc => 'pg_stat_get_activity' },
{ oid => '3318',
descr => 'statistics: information about progress of backends running maintenance command',
@@ -5507,6 +5507,14 @@
proargnames => '{wal_buffers_full,stats_reset}',
prosrc => 'pg_stat_get_wal' },
+{ oid => '1137', descr => 'statistics: information about network traffic',
+ proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+ proallargtypes => '{int4,int8,int8,int8,int8}',
+ proargmodes => '{i,o,o,o,o}',
+ proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+ prosrc => 'pg_stat_get_network_traffic' },
+
{ oid => '2306', descr => 'statistics: information about SLRU caches',
proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5638,6 +5646,10 @@
proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text',
proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
+{ oid => '4142', descr => 'connection compression algorithm',
+ proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text',
+ proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
{ oid => '1946',
descr => 'convert bytea value into some ascii-only text string',
proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000000..27aef0aab9
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,36 @@
+/*
+ * zpq_stream.h
+ * Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+ZpqStream* zpq_create(int impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered_rx(ZpqStream* zs);
+size_t zpq_buffered_tx(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+char const* zpq_algorithm_name(ZpqStream* zs);
+
+/*
+ Returns zero terminated array with compression algorithms names
+*/
+char** zpq_get_supported_algorithms(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281ad5..5289f9f001 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,8 @@ typedef struct Port
int keepalives_count;
int tcp_user_timeout;
+ char* compression_algorithms; /* Compression algorithms supported by client */
+
/*
* GSSAPI structures.
*/
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b1152475ac..224ff3d47b 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void RemoveSocketFiles(void);
extern void pq_init(void);
+extern int pq_configure(Port* port);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern void pq_startmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index 781d86c8ef..c0d15f1cfb 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -150,6 +150,7 @@ typedef struct StartupPacket
} StartupPacket;
extern bool Db_user_namespace;
+extern bool libpq_compression;
/*
* In protocol 3.0 and later, the startup packet length is not fixed, but
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838e53..b336eeb175 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
/* Define to 1 if you have the `link' function. */
#undef HAVE_LINK
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
/* Define to 1 if the system has the type `locale_t'. */
#undef HAVE_LOCALE_T
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 257e515bfe..b43efc0499 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1190,6 +1190,12 @@ typedef struct PgBackendStatus
/* application name; MUST be null-terminated */
char *st_appname;
+ /* client-server traffic information */
+ uint64 st_rx_raw_bytes;
+ uint64 st_tx_raw_bytes;
+ uint64 st_rx_compressed_bytes;
+ uint64 st_tx_compressed_bytes;
+
/*
* Current command string; MUST be null-terminated. Note that this string
* possibly is truncated in the middle of a multi-byte character. As
@@ -1403,6 +1409,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
extern void pgstat_report_tempfile(size_t filesize);
extern void pgstat_report_appname(const char *appname);
extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
extern const char *pgstat_get_wait_event(uint32 wait_event_info);
extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b340..be8cb34e40 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
# The MSVC build system scrapes OBJS from this file. If you change any of
# the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
OBJS = \
$(WIN32RES) \
fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d010f..f407257ac4 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
#include "common/ip.h"
#include "common/link-canary.h"
#include "common/scram-common.h"
+#include "common/zpq_stream.h"
#include "common/string.h"
#include "fe-auth.h"
#include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"compression", "COMPRESSION", NULL, NULL,
+ "Libpq-compression", "", 16,
+ offsetof(struct pg_conn, compression)},
+
{"target_session_attrs", "PGTARGETSESSIONATTRS",
DefaultTargetSessionAttrs, NULL,
"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
void
pqDropConnection(PGconn *conn, bool flushInput)
{
+ /* Release compression streams */
+ zpq_free(conn->zstream);
+ conn->zstream = NULL;
+
/* Drop any SSL state */
pqsecure_close(conn);
@@ -2148,6 +2157,12 @@ connectDBComplete(PGconn *conn)
return 1; /* success! */
case PGRES_POLLING_READING:
+ /* if there is some buffered RX data in ZpqStream
+ * then don't proceed to pqWaitTimed */
+ if (zpq_buffered_rx(conn->zstream)) {
+ break;
+ }
+
ret = pqWaitTimed(1, 0, conn, finish_time);
if (ret == -1)
{
@@ -3216,11 +3231,62 @@ keep_going: /* We will come back to here until there is
*/
conn->inCursor = conn->inStart;
- /* Read type byte */
- if (pqGetc(&beresp, conn))
+ while (1)
{
- /* We'll come back when there is more data */
- return PGRES_POLLING_READING;
+ /* Read type byte */
+ if (pqGetc(&beresp, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+
+ if (beresp == 'z') /* Switch on compression */
+ {
+ int index;
+ char resp;
+ /* Read message length word */
+ if (pqGetInt(&msgLength, 4, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+ if (msgLength != 5)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+ msgLength);
+ goto error_return;
+ }
+ pqGetc(&resp, conn);
+ index = resp;
+ if (index == (char)-1)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "server is not supported requested compression algorithms %s\n"),
+ conn->compression);
+ goto error_return;
+ }
+ Assert(!conn->zstream);
+ conn->zstream = zpq_create(conn->compressors[index].impl,
+ conn->compressors[index].level,
+ (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+ &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+ if (!conn->zstream)
+ {
+ char** supported_algorithms = zpq_get_supported_algorithms();
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "failed to initialize compressor %s\n"),
+ supported_algorithms[conn->compressors[index].impl]);
+ free(supported_algorithms);
+ goto error_return;
+ }
+ /* reset buffer */
+ conn->inStart = conn->inCursor = conn->inEnd = 0;
+ } else
+ break;
}
/*
@@ -4020,6 +4086,8 @@ freePGconn(PGconn *conn)
free(conn->dbName);
if (conn->replication)
free(conn->replication);
+ if (conn->compression)
+ free(conn->compression);
if (conn->pguser)
free(conn->pguser);
if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..00ccefbac0 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn)
* EOF indication. We expect therefore that this won't result in any
* undue delay in reporting a previous write failure.)
*/
- if (flushResult ||
- pqWait(true, false, conn) ||
+ if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
+ pqWait(true, false, conn)) ||
pqReadData(conn) < 0)
{
/*
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f33fb..768d856cf3 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,12 +53,24 @@
#include "pg_config_paths.h"
#include "port/pg_bswap.h"
+#include <common/zpq_stream.h>
+
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
+/*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn) \
+ (conn->zstream \
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd, \
+ conn->inBufSize - conn->inEnd) \
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \
+ conn->inBufSize - conn->inEnd))
+
/*
* PQlibVersion: return the libpq version number
*/
@@ -664,10 +676,17 @@ pqReadData(PGconn *conn)
/* OK, try to read some data */
retry3:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ nread = pq_read_conn(conn);
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
switch (SOCK_ERRNO)
{
case EINTR:
@@ -759,10 +778,18 @@ retry3:
* arrived.
*/
retry4:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ nread = pq_read_conn(conn);
+
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
switch (SOCK_ERRNO)
{
case EINTR:
@@ -875,12 +902,17 @@ pqSendSome(PGconn *conn, int len)
}
/* while there's still data to send */
- while (len > 0)
+ while (len > 0 || zpq_buffered_tx(conn->zstream))
{
int sent;
-
+ size_t processed = 0;
+ /*
+ * Use zpq_write if compression is switched on
+ */
+ sent = conn->zstream
+ ? zpq_write(conn->zstream, ptr, len, &processed)
#ifndef WIN32
- sent = pqsecure_write(conn, ptr, len);
+ : pqsecure_write(conn, ptr, len);
#else
/*
@@ -888,8 +920,11 @@ pqSendSome(PGconn *conn, int len)
* failure-point appears to be different in different versions of
* Windows, but 64k should always be safe.
*/
- sent = pqsecure_write(conn, ptr, Min(len, 65536));
+ : pqsecure_write(conn, ptr, Min(len, 65536));
#endif
+ ptr += processed;
+ len -= processed;
+ remaining -= processed;
if (sent < 0)
{
@@ -943,7 +978,7 @@ pqSendSome(PGconn *conn, int len)
remaining -= sent;
}
- if (len > 0)
+ if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
{
/*
* We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..b83dc069de 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
if (async)
return 0;
/* Need to load more data */
- if (pqWait(true, false, conn) ||
+ if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
pqReadData(conn) < 0)
return -2;
continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
{
/* need to load more data */
- if (pqWait(true, false, conn) ||
+ if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
pqReadData(conn) < 0)
{
*s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
if (needInput)
{
/* Wait for some data to arrive (or for the channel to close) */
- if (pqWait(true, false, conn) ||
+ if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
pqReadData(conn) < 0)
break;
}
@@ -2134,6 +2134,148 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
return startpacket;
}
+/*
+ * Build comma-separated list of compression algorithms suggested by client to the server.
+ * It can be either explicitly specified by user in connection string, either
+ * include all algorithms supported by clit library.
+ * This functions returns true if compression string is successfully parsed and
+ * stores comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to *client_compressors.
+ * Also it creates array of compressor descriptors, each element of which corresponds
+ * the correspondent algorithm name in *client_compressors list. This array is stored in PGconn
+ * and is used during handshake when compassion acknowledgment response is received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors)
+{
+ char** supported_algorithms = zpq_get_supported_algorithms();
+ char* value = conn->compression;
+ int n_supported_algorithms;
+ int total_len = 0;
+ int i;
+
+ for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++)
+ {
+ total_len += strlen(supported_algorithms[n_supported_algorithms])+1;
+ }
+
+ if (pg_strcasecmp(value, "true") == 0 ||
+ pg_strcasecmp(value, "yes") == 0 ||
+ pg_strcasecmp(value, "on") == 0 ||
+ pg_strcasecmp(value, "any") == 0 ||
+ pg_strcasecmp(value, "1") == 0)
+ {
+ /* Compression is enabled: choose algorithm automatically */
+ char* p;
+
+ if (n_supported_algorithms == 0)
+ {
+ *client_compressors = NULL; /* no compressors are avaialable */
+ conn->compressors = NULL;
+ return true;
+ }
+ *client_compressors = p = malloc(total_len);
+ if (build_descriptors)
+ conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+ for (i = 0; i < n_supported_algorithms; i++)
+ {
+ strcpy(p, supported_algorithms[i]);
+ p += strlen(p);
+ *p++ = ',';
+ if (build_descriptors)
+ {
+ conn->compressors[i].impl = i;
+ conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+ }
+ }
+ p[-1] = '\0';
+ return true;
+ }
+ else if (*value == 0 ||
+ pg_strcasecmp(value, "false") == 0 ||
+ pg_strcasecmp(value, "no") == 0 ||
+ pg_strcasecmp(value, "off") == 0 ||
+ pg_strcasecmp(value, "0") == 0)
+ {
+ /* Compression is disabled */
+ *client_compressors = NULL;
+ conn->compressors = NULL;
+ return true;
+ }
+ else
+ {
+ /* List of compresison algorithms separated by commas */
+ char *src, *dst;
+ int n_suggested_algorithms = 0;
+
+ *client_compressors = src = dst = strdup(value);
+
+ if (build_descriptors)
+ conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+
+ while (*src != '\0')
+ {
+ char* sep = strchr(src, ',');
+ char* col;
+ int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+
+ if (sep != NULL)
+ *sep = '\0';
+
+ strcpy(dst, src);
+
+ col = strchr(src, ':');
+ if (col != NULL)
+ {
+ *col = '\0';
+ if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors)
+ {
+ fprintf(stderr,
+ libpq_gettext("WARNING: invlaid compression level %s in compression option '%s'\n"),
+ col+1, value);
+ return false;
+ }
+ }
+ for (i = 0; supported_algorithms[i] != NULL; i++)
+ {
+ if (pg_strcasecmp(src, supported_algorithms[i]) == 0)
+ {
+ if (build_descriptors)
+ {
+ conn->compressors[n_suggested_algorithms].impl = i;
+ conn->compressors[n_suggested_algorithms].level = compression_level;
+ }
+ n_suggested_algorithms += 1;
+ dst += strlen(dst);
+ *dst++ = ',';
+ break;
+ }
+ }
+ if (sep)
+ src = sep+1;
+ else
+ break;
+ }
+ if (n_suggested_algorithms == 0)
+ {
+ if (!build_descriptors)
+ fprintf(stderr,
+ libpq_gettext("WARNING: none of specified algirthms %s is supported by client\n"),
+ value);
+ else
+ {
+ free(conn->compressors);
+ conn->compressors = NULL;
+ }
+ free(*client_compressors);
+ *client_compressors = NULL;
+ return false;
+ }
+ dst[-1] = '\0';
+ return true;
+ }
+}
+
/*
* Build a startup packet given a filled-in PGconn structure.
*
@@ -2180,6 +2322,17 @@ build_startup_packet(const PGconn *conn, char *packet,
ADD_STARTUP_OPTION("replication", conn->replication);
if (conn->pgoptions && conn->pgoptions[0])
ADD_STARTUP_OPTION("options", conn->pgoptions);
+ if (conn->compression && conn->compression[0])
+ {
+ char* client_compression_algorithms;
+ if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL))
+ {
+ if (client_compression_algorithms)
+ {
+ ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms);
+ }
+ }
+ }
if (conn->send_appname)
{
/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..89128df7a5 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
/* include stuff common to fe and be */
#include "getaddrinfo.h"
#include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
/* include stuff found in fe only */
#include "pqexpbuffer.h"
@@ -317,6 +318,16 @@ typedef struct pg_conn_host
* found in password file. */
} pg_conn_host;
+
+/*
+ * Descriptors of compression algorithms chosen by client
+ */
+typedef struct pg_conn_compressor
+{
+ int impl; /* compression implementation index */
+ int level; /* compression level */
+} pg_conn_compressor;
+
/*
* PGconn stores all the state data associated with a single connection
* to a backend.
@@ -370,6 +381,9 @@ struct pg_conn
char *ssl_min_protocol_version; /* minimum TLS protocol version */
char *ssl_max_protocol_version; /* maximum TLS protocol version */
+ char *compression; /* stream compression (boolean value, "any" or list of compression algorithms separated by comma) */
+ pg_conn_compressor* compressors; /* descriptors of compression algorithms chosen by client */
+
/* Type of connection to make. Possible values: any, read-write. */
char *target_session_attrs;
@@ -527,6 +541,9 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Compression stream */
+ ZpqStream* zstream;
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 097ff5d111..aa9359ca6c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid,
s.backend_xmin,
s.query,
s.backend_type
- FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+ FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
LEFT JOIN pg_database d ON ((s.datid = d.oid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid,
s.gss_auth AS gss_authenticated,
s.gss_princ AS principal,
s.gss_enc AS encrypted
- FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+ FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+ s.rx_raw_bytes,
+ s.tx_raw_bytes,
+ s.rx_compressed_bytes,
+ s.tx_compressed_bytes
+ FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
pg_stat_progress_analyze| SELECT s.pid,
s.datid,
d.datname,
@@ -2015,7 +2021,7 @@ pg_stat_replication| SELECT s.pid,
w.sync_priority,
w.sync_state,
w.reply_time
- FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+ FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_replication_slots| SELECT s.slot_name,
@@ -2046,7 +2052,7 @@ pg_stat_ssl| SELECT s.pid,
s.ssl_client_dn AS client_dn,
s.ssl_client_serial AS client_serial,
s.ssl_issuer_dn AS issuer_dn
- FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+ FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
WHERE (s.client_port IS NOT NULL);
pg_stat_subscription| SELECT su.oid AS subid,
su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd41b..daac58c662 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -123,7 +123,7 @@ sub mkvcbuild
config_info.c controldata_utils.c d2s.c encnames.c exec.c
f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c
keywords.c kwlookup.c link-canary.c md5.c
- pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+ pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
wait_error.c wchar.c);