Hello, On 25/04/2023 15:23, Denis Laxalde wrote: > This sounds like a useful addition to me. I've played a bit with it in > Psycopg and it works fine.
Thank you very much for reviewing my patch! I have attached a new version of it that addresses your comments and that has been rebased on top of the current tip of the master branch (by fixing a merge conflict), i.e. commit 7b7fa85130330128b404eddebd4f33c6739454b0. For the sake of others who might read this e-mail thread, I would like to mention that my patch is complete (including documentation and tests, but modulo review comments, of course), and that it passes the tests, i.e.: make check make -C src/test/modules/libpq_pipeline check Best wishes, Anton Kirilov
From 7aef7b2cf1ffea0786ab1fb4eca9d85ce7242cf0 Mon Sep 17 00:00:00 2001 From: Anton Kirilov <antonvkirilov@gmail.com> Date: Wed, 22 Mar 2023 20:39:57 +0000 Subject: [PATCH v2] Add PQsendSyncMessage() to libpq This new function is equivalent to PQpipelineSync(), except that it does not flush anything to the server; the user must subsequently call PQflush() instead. Its purpose is to reduce the system call overhead of pipeline mode. --- doc/src/sgml/libpq.sgml | 45 ++++++++++++++++--- src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-exec.c | 24 +++++++++- src/interfaces/libpq/libpq-fe.h | 1 + .../modules/libpq_pipeline/libpq_pipeline.c | 37 +++++++++++++++ .../traces/multi_pipelines.trace | 11 +++++ 6 files changed, 111 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index cce25d06e6..c2ee982135 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3490,8 +3490,9 @@ ExecStatusType PQresultStatus(const PGresult *res); <listitem> <para> The <structname>PGresult</structname> represents a - synchronization point in pipeline mode, requested by - <xref linkend="libpq-PQpipelineSync"/>. + synchronization point in pipeline mode, requested by either + <xref linkend="libpq-PQpipelineSync"/> or + <xref linkend="libpq-PQsendSyncMessage"/>. This status occurs only when pipeline mode has been selected. </para> </listitem> @@ -5019,7 +5020,8 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName); <xref linkend="libpq-PQsendPrepare"/>, <xref linkend="libpq-PQsendQueryPrepared"/>, <xref linkend="libpq-PQsendDescribePrepared"/>, - <xref linkend="libpq-PQsendDescribePortal"/>, or + <xref linkend="libpq-PQsendDescribePortal"/>, + <xref linkend="libpq-PQsendSyncMessage"/>, or <xref linkend="libpq-PQpipelineSync"/> call, and returns it. A null pointer is returned when the command is complete and there @@ -5399,8 +5401,9 @@ int PQflush(PGconn *conn); client sends them. The server will begin executing the commands in the pipeline immediately, not waiting for the end of the pipeline. Note that results are buffered on the server side; the server flushes - that buffer when a synchronization point is established with - <function>PQpipelineSync</function>, or when + that buffer when a synchronization point is established with either + <function>PQpipelineSync</function> or + <function>PQsendSyncMessage</function>, or when <function>PQsendFlushRequest</function> is called. If any statement encounters an error, the server aborts the current transaction and does not execute any subsequent command in the queue @@ -5457,7 +5460,8 @@ int PQflush(PGconn *conn); <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal> and <literal>PGRES_PIPELINE_ABORTED</literal>. <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each - <function>PQpipelineSync</function> at the corresponding point + <function>PQpipelineSync</function> or + <function>PQsendSyncMessage</function> at the corresponding point in the pipeline. <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal query result for the first error and all subsequent results @@ -5495,7 +5499,8 @@ int PQflush(PGconn *conn); <function>PQresultStatus</function> will report a <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued operation in an aborted pipeline. The result for - <function>PQpipelineSync</function> is reported as + <function>PQpipelineSync</function> or + <function>PQsendSyncMessage</function> is reported as <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline and resumption of normal result processing. </para> @@ -5727,6 +5732,32 @@ int PQsendFlushRequest(PGconn *conn); </para> </listitem> </varlistentry> + + <varlistentry id="libpq-PQsendSyncMessage"> + <term><function>PQsendSyncMessage</function><indexterm><primary>PQsendSyncMessage</primary></indexterm></term> + + <listitem> + <para> + Marks a synchronization point in a pipeline by sending a + <link linkend="protocol-flow-ext-query">sync message</link> + without flushing the send buffer. This serves as + the delimiter of an implicit transaction and an error recovery + point; see <xref linkend="libpq-pipeline-errors"/>. + +<synopsis> +int PQsendSyncMessage(PGconn *conn); +</synopsis> + </para> + <para> + Returns 1 for success. Returns 0 if the connection is not in + pipeline mode or sending a + <link linkend="protocol-flow-ext-query">sync message</link> + failed. + Note that the message is not itself flushed to the server automatically; + use <function>PQflush</function> if necessary. + </para> + </listitem> + </varlistentry> </variablelist> </sect2> diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 7ded77aff3..3c364d692a 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -187,3 +187,4 @@ PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 PQconnectionUsedGSSAPI 187 +PQsendSyncMessage 188 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index a16bbf32ef..83113d6e6e 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -82,6 +82,7 @@ static int PQsendDescribe(PGconn *conn, char desc_type, static int check_field_number(const PGresult *res, int field_num); static void pqPipelineProcessQueue(PGconn *conn); static int pqPipelineFlush(PGconn *conn); +static int send_sync_message(PGconn *conn, bool flush); /* ---------------- @@ -3143,6 +3144,17 @@ pqPipelineProcessQueue(PGconn *conn) */ int PQpipelineSync(PGconn *conn) +{ + return send_sync_message(conn, true); +} + +/* + * send_sync_message: subroutine for PQpipelineSync and PQsendSyncMessage + * Send a Sync message as part of a pipeline, + * and optionally flush to server + */ +static int +send_sync_message(PGconn *conn, bool flush) { PGcmdQueueEntry *entry; @@ -3189,7 +3201,7 @@ PQpipelineSync(PGconn *conn) * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (PQflush(conn) < 0) + if (flush && PQflush(conn) < 0) goto sendFailed; /* OK, it's launched! */ @@ -3238,6 +3250,16 @@ PQsendFlushRequest(PGconn *conn) return 1; } +/* + * PQsendSyncMessage + * Send a Sync message as part of a pipeline without flushing to server + */ +int +PQsendSyncMessage(PGconn *conn) +{ + return send_sync_message(conn, false); +} + /* ====== accessor funcs for PGresult ======== */ ExecStatusType diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 7476dbe0e9..86efb2e764 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -474,6 +474,7 @@ extern int PQenterPipelineMode(PGconn *conn); extern int PQexitPipelineMode(PGconn *conn); extern int PQpipelineSync(PGconn *conn); extern int PQsendFlushRequest(PGconn *conn); +extern int PQsendSyncMessage(PGconn *conn); /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index f48da7d963..72d3d33a22 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -169,6 +169,14 @@ test_multi_pipelines(PGconn *conn) if (PQpipelineSync(conn) != 1) pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn)); + + /* Skip flushing once. */ + if (PQsendSyncMessage(conn) != 1) + pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, dummy_params, NULL, NULL, 0) != 1) pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn)); @@ -206,6 +214,35 @@ test_multi_pipelines(PGconn *conn) /* second pipeline */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first result"); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result expected: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of sync result, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + + /* third pipeline */ + res = PQgetResult(conn); if (res == NULL) pg_fatal("PQgetResult returned null when there's a pipeline item: %s", diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace index 4b9ab07ca4..1ee21f61dc 100644 --- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace +++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace @@ -8,6 +8,17 @@ F 19 Bind "" "" 0 1 1 '1' 1 0 F 6 Describe P "" F 9 Execute "" 0 F 4 Sync +F 21 Parse "" "SELECT $1" 1 NNNN +F 19 Bind "" "" 0 1 1 '1' 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 4 Sync +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +B 5 ReadyForQuery I B 4 ParseComplete B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 -- 2.34.1