Hello,

Recently I have been trying to use libpq's pipeline mode in a project,
and in the process I have noticed that the PQpipelineSync() function
has a deficiency (which, to be fair, could be an advantage in other
situations): It combines the establishment of a synchronization point
in a pipeline with a send buffer flush, i.e. a system call. In my use
case I build up a pipeline of several completely independent queries,
so a synchronization point is required between each of them, but
performing a system call for each is just unnecessary overhead,
especially if the system is severely affected by any mitigations for
Spectre or other security vulnerabilities. That's why I propose to add
an interface to libpq to establish a synchronization point in a
pipeline without performing any further actions.

I have attached a patch that introduces PQsendSyncMessage(), a
function that is equivalent to PQpipelineSync(), except that it does
not flush anything to the server; the user must subsequently call
PQflush() instead. Alternatively, the new function is equivalent to
PQsendFlushRequest(), except that it sends a sync message instead of a
flush request. In addition to reducing the system call overhead of
libpq's pipeline mode, it also makes it easier for the operating
system to send as much of the pipeline as possible in a single TCP (or
lower level protocol) packet when the database is running remotely.

I would appeciate your thoughts on my proposal.

Best wishes,
Anton Kirilov
From 5b4f7d4e79b8e7f2a58026a8131a21c235603cc0 Mon Sep 17 00:00:00 2001
From: Anton Kirilov <antonvkirilov@gmail.com>
Date: Wed, 22 Mar 2023 20:39:57 +0000
Subject: [PATCH v1] Add PQsendSyncMessage() to libpq

This new function is equivalent to PQpipelineSync(), except
that it does not flush anything to the server; the user must
subsequently call PQflush() instead. Its purpose is to reduce
the system call overhead of pipeline mode.
---
 doc/src/sgml/libpq.sgml                       |  45 ++++++--
 src/interfaces/libpq/exports.txt              |   1 +
 src/interfaces/libpq/fe-exec.c                |  24 ++++-
 src/interfaces/libpq/libpq-fe.h               |   1 +
 .../modules/libpq_pipeline/libpq_pipeline.c   | 101 ++++++++++++++++++
 .../libpq_pipeline/t/001_libpq_pipeline.pl    |   4 +-
 .../traces/multi_pipelines_noflush.trace      |  23 ++++
 7 files changed, 189 insertions(+), 10 deletions(-)
 create mode 100644 src/test/modules/libpq_pipeline/traces/multi_pipelines_noflush.trace

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9ee5532c07..70b90331b0 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3318,8 +3318,9 @@ ExecStatusType PQresultStatus(const PGresult *res);
           <listitem>
            <para>
             The <structname>PGresult</structname> represents a
-            synchronization point in pipeline mode, requested by
-            <xref linkend="libpq-PQpipelineSync"/>.
+            synchronization point in pipeline mode, requested by either
+            <xref linkend="libpq-PQpipelineSync"/> or
+            <xref linkend="libpq-PQsendSyncMessage"/>.
             This status occurs only when pipeline mode has been selected.
            </para>
           </listitem>
@@ -4847,7 +4848,8 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName);
        <xref linkend="libpq-PQsendPrepare"/>,
        <xref linkend="libpq-PQsendQueryPrepared"/>,
        <xref linkend="libpq-PQsendDescribePrepared"/>,
-       <xref linkend="libpq-PQsendDescribePortal"/>, or
+       <xref linkend="libpq-PQsendDescribePortal"/>,
+       <xref linkend="libpq-PQsendSyncMessage"/>, or
        <xref linkend="libpq-PQpipelineSync"/>
        call, and returns it.
        A null pointer is returned when the command is complete and there
@@ -5227,8 +5229,9 @@ int PQflush(PGconn *conn);
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
      Note that results are buffered on the server side; the server flushes
-     that buffer when a synchronization point is established with
-     <function>PQpipelineSync</function>, or when
+     that buffer when a synchronization point is established with either
+     <function>PQpipelineSync</function> or
+     <function>PQsendSyncMessage</function>, or when
      <function>PQsendFlushRequest</function> is called.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
@@ -5285,7 +5288,8 @@ int PQflush(PGconn *conn);
      <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
      and <literal>PGRES_PIPELINE_ABORTED</literal>.
      <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
-     <function>PQpipelineSync</function> at the corresponding point
+     <function>PQpipelineSync</function> or
+     <function>PQsendSyncMessage</function> at the corresponding point
      in the pipeline.
      <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
      query result for the first error and all subsequent results
@@ -5323,7 +5327,8 @@ int PQflush(PGconn *conn);
      <function>PQresultStatus</function> will report a
      <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
      operation in an aborted pipeline. The result for
-     <function>PQpipelineSync</function> is reported as
+     <function>PQpipelineSync</function> or
+     <function>PQsendSyncMessage</function> is reported as
      <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
      and resumption of normal result processing.
     </para>
@@ -5555,6 +5560,32 @@ int PQsendFlushRequest(PGconn *conn);
        </para>
       </listitem>
      </varlistentry>
+
+    <varlistentry id="libpq-PQsendSyncMessage">
+     <term><function>PQsendSyncMessage</function><indexterm><primary>PQsendSyncMessage</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Marks a synchronization point in a pipeline by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       without flushing the send buffer. This serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <xref linkend="libpq-pipeline-errors"/>.
+
+<synopsis>
+int PQsendSyncMessage(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       pipeline mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       failed.
+       Note that the message is not itself flushed to the server automatically;
+       use <function>PQflush</function> if necessary.
+      </para>
+     </listitem>
+    </varlistentry>
    </variablelist>
   </sect2>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..9c7818408b 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQsendSyncMessage         187
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ec62550e38..1baa662b0d 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -82,6 +82,7 @@ static int	PQsendDescribe(PGconn *conn, char desc_type,
 static int	check_field_number(const PGresult *res, int field_num);
 static void pqPipelineProcessQueue(PGconn *conn);
 static int	pqPipelineFlush(PGconn *conn);
+static int	send_sync_message(PGconn *conn, int flush);
 
 
 /* ----------------
@@ -3139,6 +3140,17 @@ pqPipelineProcessQueue(PGconn *conn)
  */
 int
 PQpipelineSync(PGconn *conn)
+{
+	return send_sync_message(conn, 1);
+}
+
+/*
+ * send_sync_message: subroutine for PQpipelineSync and PQsendSyncMessage
+ *		Send a Sync message as part of a pipeline,
+ *		and optionally flush to server
+ */
+static int
+send_sync_message(PGconn *conn, int flush)
 {
 	PGcmdQueueEntry *entry;
 
@@ -3185,7 +3197,7 @@ PQpipelineSync(PGconn *conn)
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (PQflush(conn) < 0)
+	if (flush && PQflush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
@@ -3234,6 +3246,16 @@ PQsendFlushRequest(PGconn *conn)
 	return 1;
 }
 
+/*
+ * PQsendSyncMessage
+ *		Send a Sync message as part of a pipeline without flushing to server
+ */
+int
+PQsendSyncMessage(PGconn *conn)
+{
+	return send_sync_message(conn, 0);
+}
+
 /* ====== accessor funcs for PGresult ======== */
 
 ExecStatusType
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f3d9220496..e22197ee95 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -473,6 +473,7 @@ extern int	PQenterPipelineMode(PGconn *conn);
 extern int	PQexitPipelineMode(PGconn *conn);
 extern int	PQpipelineSync(PGconn *conn);
 extern int	PQsendFlushRequest(PGconn *conn);
+extern int	PQsendSyncMessage(PGconn *conn);
 
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index f48da7d963..829907957a 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -244,6 +244,104 @@ test_multi_pipelines(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+static void
+test_multi_pipelines_noflush(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi pipeline... ");
+
+	/*
+	 * Queue up a couple of small pipelines and process each without returning
+	 * to command mode first, flushing only once.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQsendSyncMessage(conn) != 1)
+		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	/* OK, start processing the results */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first result");
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+
+	/* second pipeline */
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from second pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from second pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* We're still in pipeline mode ... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+}
+
 /*
  * Test behavior when a pipeline dispatches a number of commands that are
  * not flushed by a sync point.
@@ -1683,6 +1781,7 @@ print_test_list(void)
 {
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
+	printf("multi_pipelines_noflush\n");
 	printf("nosync\n");
 	printf("pipeline_abort\n");
 	printf("pipeline_idle\n");
@@ -1786,6 +1885,8 @@ main(int argc, char **argv)
 		test_disallowed_in_pipeline(conn);
 	else if (strcmp(testname, "multi_pipelines") == 0)
 		test_multi_pipelines(conn);
+	else if (strcmp(testname, "multi_pipelines_noflush") == 0)
+		test_multi_pipelines_noflush(conn);
 	else if (strcmp(testname, "nosync") == 0)
 		test_nosync(conn);
 	else if (strcmp(testname, "pipeline_abort") == 0)
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index 7560439fec..a8a4477b15 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -37,8 +37,8 @@ for my $testname (@tests)
 {
 	my @extraargs = ('-r', $numrows);
 	my $cmptrace = grep(/^$testname$/,
-		qw(simple_pipeline nosync multi_pipelines prepared singlerow
-		  pipeline_abort pipeline_idle transaction
+		qw(simple_pipeline nosync multi_pipelines multi_pipelines_noflush
+		  prepared singlerow pipeline_abort pipeline_idle transaction
 		  disallowed_in_pipeline)) > 0;
 
 	# For a bunch of tests, generate a libpq trace file too.
diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines_noflush.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines_noflush.trace
new file mode 100644
index 0000000000..4b9ab07ca4
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines_noflush.trace
@@ -0,0 +1,23 @@
+F	21	Parse	 "" "SELECT $1" 1 NNNN
+F	19	Bind	 "" "" 0 1 1 '1' 1 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	4	Sync
+F	21	Parse	 "" "SELECT $1" 1 NNNN
+F	19	Bind	 "" "" 0 1 1 '1' 1 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	4	Sync
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	5	ReadyForQuery	 I
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	5	ReadyForQuery	 I
+F	4	Terminate
-- 
2.34.1

Reply via email to