Andres Freund wrote:

> FWIW, I still think this needs a pgbench or similar example integration,
> so we can actually properly measure the benefits.

Here's an updated version of the patch I made during review,
adding \beginbatch and \endbatch to pgbench.
The performance improvement appears clearly
with a custom script of this kind:
  \beginbatch
     UPDATE pgbench_branches SET bbalance = bbalance + 1 WHERE bid = 0;
     ..above repeated 1000 times...
  \endbatch

versus the same with a BEGIN; END; pair instead of \beginbatch \endbatch

On localhost on my desktop I tend to see a 30% difference in favor
of the batch mode with that kind of test.
On slower networks there are much bigger differences.

The latest main patch (v10) must also be slightly updated for HEAD,
because of this:
error: patch failed: src/interfaces/libpq/exports.txt:171
v11 attached without any other change.


Best regards,
-- 
Daniel Vérité
PostgreSQL-powered mailer: http://www.manitou-mail.org
Twitter: @DanielVerite
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index c3bd4f9..366f278 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4656,6 +4656,526 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch mode and query pipelining</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>pipelining</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> supports queueing up queries into
+   a pipeline to be executed as a batch on the server. Batching queries allows
+   applications to avoid a client/server round-trip after each query to get
+   the results before issuing the next query.
+  </para>
+
+  <para>
+   An example of batch use may be found in the source distribution in
+   <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename>.
+  </para>
+
+  <sect2>
+   <title>When to use batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It increases client application complexity
+    and extra caution is required to prevent client/server deadlocks but
+    can sometimes offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e. network latency
+    (<quote>ping time</quote>) is high, and when many small operations are 
being performed in
+    rapid sequence. There is usually less benefit in using batches when each
+    query takes many multiples of the client/server round-trip time to execute.
+    A 100-statement operation run on a server 300ms round-trip-time away would 
take
+    30 seconds in network latency alone without batching; with batching it may 
spend
+    as little as 0.3s waiting for results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed into
+    operations on sets or into a
+    <link linkend="libpq-copy"><literal>COPY</literal></link> operation.
+   </para>
+
+   <para>
+    Batching is less useful when information from one operation is required by 
the
+    client before it knows enough to send the next operation. The client must
+    introduce a synchronisation point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+     BEGIN;
+     SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+     -- result: x=2
+     -- client adds 1 to x:
+     UPDATE mytable SET x = 3 WHERE id = 42;
+     COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+     UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 10.0, but clients using 
PostgresSQL 10.0 version of libpq can
+     use batches on server versions 8.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+
+  </sect2>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using batch mode</title>
+
+   <para>
+    To issue batches the application must switch
+    a connection into batch mode. Enter batch mode with <link
+    
linkend="libpq-PQenterBatchMode"><function>PQenterBatchMode(conn)</function></link>
 or test
+    whether batch mode is active with <link
+    
linkend="libpq-PQbatchStatus"><function>PQbatchStatus(conn)</function></link>. 
In batch mode only <link
+    linkend="libpq-async">asynchronous operations</link> are permitted, and
+    <literal>COPY</literal> is not recommended as it most likely will trigger 
failure in batch processing. 
+    Using any synchronous command execution functions such as 
<function>PQfn</function>,
+    <function>PQexec</function> or one of its sibling functions are error 
conditions.
+    Functions allowed in batch mode are described in <xref 
linkend="libpq-batch-sending">. 
+   </para>
+
+   <para>
+    The client uses libpq's asynchronous query functions to dispatch work,
+    marking the end of each batch with <function>PQbatchSyncQueue</function>.
+    And to get results, it uses <function>PQgetResult</function> and
+    <function>PQbatchProcessQueue</function>. It may eventually exit
+    batch mode with <function>PQexitBatchMode</function> once all results are
+    processed.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-pqsetnonblocking">non-blocking mode</link>. If used 
in
+     blocking mode it is possible for a client/server deadlock to occur. The
+     client will block trying to send queries to the server, but the server 
will
+     block trying to send results from queries it has already processed to the
+     client. This only occurs when the client sends enough queries to fill its
+     output buffer and the server's receive buffer before switching to
+     processing input from the server, but it's hard to predict exactly when
+     that'll happen so it's best to always use non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests
+     using normal asynchronous <application>libpq</application> functions such 
as 
+     <function>PQsendQueryParams</function>, 
<function>PQsendPrepare</function>,
+     <function>PQsendQueryPrepared</function>, 
<function>PQsendDescribePortal</function>,
+     <function>PQsendDescribePrepared</function>.
+     The asynchronous requests are followed by a <link
+     
linkend="libpq-PQbatchSyncQueue"><function>PQbatchSyncQueue(conn)</function></link>
 call to mark
+     the end of the batch. The client <emphasis>does not</emphasis> need to 
call
+     <function>PQgetResult</function> immediately after dispatching each
+     operation. <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+    
+    <para>
+     Batched operations will be executed by the server in the order the client
+     sends them. The server will send the results in the order the statements
+     executed. The server may begin executing the batch before all commands
+     in the batch are queued and the end of batch command is sent. If any
+     statement encounters an error the server aborts the current transaction 
and
+     skips processing the rest of the batch. Query processing resumes after the
+     end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared 
statement
+     then execute it with later statements in the same batch.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing results</title>
+
+    <para>
+     The client <link linkend="libpq-batch-interleave">interleaves result
+     processing</link> with sending batch queries, or for small batches may
+     process all results after sending the whole batch.
+    </para>
+
+    <para>
+     To get the result of the first batch entry the client must call <link
+     
linkend="libpq-PQbatchProcessQueue"><function>PQbatchProcessQueue</function></link>.
 It must then call
+     <function>PQgetResult</function> and handle the results until
+     <function>PQgetResult</function> returns null. The result from the next 
batch entry 
+     may then be retrieved using <function>PQbatchProcessQueue</function> and 
the cycle repeated.  The
+     application handles individual statement results as normal.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function> 
immediately after a
+     successful call of <function>PQbatchProcessQueue</function>. This mode 
selection is effective 
+     only for the query currently being processed. For more information on the 
use of <function>PQsetSingleRowMode
+     </function>, refer to <xref linkend="libpq-single-row-mode">.
+     
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal 
asynchronous
+     processing except that it may contain the new <type>PGresult</type> types
+     <literal>PGRES_BATCH_END</literal> and 
<literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQbatchSyncQueue</function> call at the corresponding point in
+     the result stream and at no other time. 
<literal>PGRES_BATCH_ABORTED</literal>
+     is emitted during error handling; see <link linkend="libpq-batch-errors">
+     error handling</link>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed. The application
+     must keep track of the order in which it sent queries and the expected
+     results. Applications will typically use a state machine or a FIFO queue
+     for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQbatchSyncQueue</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQbatchProcessQueue(...)</function> and
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving result processing and query dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured around
+     a nonblocking I/O loop using a function like <function>select</function>,
+     <function>poll</function>, <function>epoll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work still to
+     be dispatched and a queue of work that has been dispatched but not yet had
+     its results processed. When the socket is writable it should dispatch more
+     work. When the socket is readable it should read results and process them,
+     matching them up to the next entry in its expected results queue. Batches
+     should be scoped to logical units of work, usually (but not always) one
+     transaction per batch. There's no need to exit batch mode and re-enter it
+     between batches or to wait for one batch to finish before sending the 
next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state machine
+     to track sent and received work is in
+     <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename> in the 
PostgreSQL
+     source distribution.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending batch mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and the end 
batch
+     result has been consumed the application may return to non-batched mode 
with
+     <link 
linkend="libpq-PQexitBatchMode"><function>PQexitBatchMode(conn)</function></link>.
+    </para>
+   </sect3>
+
+  </sect2>
+
+  <sect2 id="libpq-funcs-batch">
+   <title>Functions associated with batch mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQbatchStatus">
+     <term>
+      <function>PQbatchStatus</function>
+      <indexterm>
+       <primary>PQbatchStatus</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns current batch mode status of the 
<application>libpq</application> connection.
+<synopsis>
+int PQbatchStatus(PGconn *conn);
+</synopsis>
+      </para>                  
+      <variablelist>
+         <varlistentry id="libpq-PQbatchStatus-1">
+           <term>
+             <literal>PQBATCH_MODE_ON</literal>
+           </term>
+ 
+          <listitem>
+           <para>
+             Returns <literal>PQBATCH_MODE_ON</literal> if 
<application>libpq</application> connection is in <link
+             linkend="libpq-batch-mode">batch mode</link>.
+           </para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-PQbatchStatus-2">
+          <term>
+            <literal>PQBATCH_MODE_OFF</literal>
+          </term>
+  
+          <listitem>
+          <para>
+            Returns <literal>PQBATCH_MODE_OFF</literal> if 
<application>libpq</application> connection is not in <link
+            linkend="libpq-batch-mode">batch mode</link>.
+          </para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-PQbatchStatus-3">
+          <term>
+            <literal>PQBATCH_MODE_ABORTED</literal>
+          </term>
+          <listitem>
+            <para>
+                Returns <literal>PQBATCH_MODE_ABORTED</literal> if 
<application>libpq</application> connection is in 
+                aborted status. The aborted flag is cleared as soon as the 
result of the 
+                <function>PQbatchSyncQueue</function> at the end of the 
aborted batch is 
+                processed. Clients don't usually need this function to verify 
aborted status 
+                as they can tell that the batch is aborted from 
<literal>PGRES_BATCH_ABORTED</literal> 
+                result codes.
+            </para>
+          </listitem>
+        </varlistentry>
+  
+       </variablelist>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterBatchMode">
+     <term>
+      <function>PQenterBatchMode</function>
+      <indexterm>
+       <primary>PQenterBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode.
+
+<synopsis>
+int PQenterBatchMode(PGconn *conn);
+</synopsis>
+
+        </para>
+        <para>
+          Returns 1 for success. Returns 0 and has no 
+          effect if the connection is not currently idle, i.e. it has a result 
+          ready, is waiting for more input from the server, etc. This function 
+          does not actually send anything to the server, it just changes the 
+          <application>libpq</application> connection state.
+
+        </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitBatchMode">
+     <term>
+      <function>PQexitBatchMode</function>
+      <indexterm>
+       <primary>PQexitBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to exit batch mode if it is currently in batch mode
+      with an empty queue and no pending results.
+<synopsis>
+int PQexitBatchMode(PGconn *conn);
+</synopsis>
+        </para>
+        <para>Returns 1 for success.
+      Returns 1 and takes no action if not in batch mode. If the connection has
+      pending batch items in the queue for reading with
+      <function>PQbatchProcessQueue</function>, the current statement isn't 
finished
+      processing or there are results pending for collection with
+      <function>PQgetResult</function>, returns 0 and does nothing.
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchSyncQueue">
+     <term>
+      <function>PQbatchSyncQueue</function>
+      <indexterm>
+       <primary>PQbatchSyncQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Delimits the end of a set of a batched commands by sending a <link
+      linkend="protocol-flow-ext-query">sync message</link> and flushing
+      the send buffer. The end of a batch serves as 
+      the delimiter of an implicit transaction and
+      an error recovery point; see <link linkend="libpq-batch-errors">
+      error handling</link>.
+
+<synopsis>
+int PQbatchSyncQueue(PGconn *conn);
+</synopsis>
+        </para>
+        <para>Returns 1 for success. Returns 0 if the connection is not in 
batch mode
+              or sending a <link linkend="protocol-flow-ext-query">sync 
message</link> is failed.
+
+        </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchProcessQueue">
+     <term>
+      <function>PQbatchProcessQueue</function>
+      <indexterm>
+       <primary>PQbatchProcessQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes the connection to start processing the next queued query's
+      results. 
+     </para>
+
+<synopsis>
+int PQbatchProcessQueue(PGconn *conn);
+</synopsis>
+
+     <para>
+      Returns 1 if a new query was popped from the result queue
+      for processing. Returns 0 and has no effect if there are no query results
+      pending, batch mode is not enabled, or if the query currently processed
+      is incomplete or still has pending results. Reason for these failures 
can 
+      be verified with <function>PQbatchQueueCount</function>, 
<function>PQbatchStatus
+      </function> and <function>PQgetResult</function>.
+      See <link linkend="libpq-batch-results">processing results</link>.
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchQueueCount">
+     <term>
+      <function>PQbatchQueueCount</function>
+      <indexterm>
+       <primary>PQbatchQueueCount</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns the number of queries still in the queue for this batch, not
+      including any query that's currently having results being processed.
+      This is the number of times <function>PQbatchProcessQueue</function> has 
to be
+      called before the query queue is empty again.
+
+<synopsis>
+int PQbatchQueueCount(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+
+   </variablelist>
+
+  </sect2>
+
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-By-Row</title>
 
@@ -4696,6 +5216,14 @@ int PQflush(PGconn *conn);
    Each object should be freed with <function>PQclear</function> as usual.
   </para>
 
+  <note>
+    <para>
+     On using batch mode, call <function>PQsetSingleRowMode</function>
+     immediately after a successful call of 
<function>PQbatchProcessQueue</function>
+     See <xref linkend="libpq-batch-mode"> for more information.
+    </para>
+   </note>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-pqsetsinglerowmode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 7757e1e..db8523d 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while libpq connection is 
in batch mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f6fa0e4..d4ee576 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -936,6 +936,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
                        walres->status = WALRCV_ERROR;
                        walres->err = pchomp(PQerrorMessage(conn->streamConn));
                        break;
+               default:
+               /* This is just to keep compiler quiet */
+                       break;
        }
 
        PQclear(pgres);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index d6a38d0..49871f5 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -172,3 +172,9 @@ PQsslAttribute            169
 PQsetErrorContextVisibility 170
 PQresultVerboseErrorMessage 171
 PQencryptPasswordConn     172
+PQbatchQueueCount        173
+PQenterBatchMode         174
+PQexitBatchMode           175
+PQbatchSyncQueue         176
+PQbatchProcessQueue      177
+PQbatchStatus            178
diff --git a/src/interfaces/libpq/fe-connect.c 
b/src/interfaces/libpq/fe-connect.c
index 02ec8f0..a92f4bc 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -3483,6 +3483,25 @@ sendTerminateConn(PGconn *conn)
 }
 
 /*
+ * PQfreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+PQfreeCommandQueue(PGcommandQueueEntry *queue)
+{
+
+       while (queue != NULL)
+       {
+               PGcommandQueueEntry *prev = queue;
+
+               queue = queue->next;
+               if (prev->query)
+                       free(prev->query);
+               free(prev);
+       }
+}
+
+/*
  * closePGconn
  *      - properly close a connection to the backend
  *
@@ -3494,6 +3513,7 @@ static void
 closePGconn(PGconn *conn)
 {
        PGnotify   *notify;
+       PGcommandQueueEntry *queue;
        pgParameterStatus *pstatus;
 
        sendTerminateConn(conn);
@@ -3526,6 +3546,14 @@ closePGconn(PGconn *conn)
                free(prev);
        }
        conn->notifyHead = conn->notifyTail = NULL;
+       queue = conn->cmd_queue_head;
+       PQfreeCommandQueue(queue);
+       conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+       queue = conn->cmd_queue_recycle;
+       PQfreeCommandQueue(queue);
+
+       conn->cmd_queue_recycle = NULL;
        pstatus = conn->pstatus;
        while (pstatus != NULL)
        {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 9decd53..2a3928b 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char     *const pgresStatus[] = {
        "PGRES_NONFATAL_ERROR",
        "PGRES_FATAL_ERROR",
        "PGRES_COPY_BOTH",
-       "PGRES_SINGLE_TUPLE"
+       "PGRES_SINGLE_TUPLE",
+       "PGRES_BATCH_END",
+       "PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -69,6 +71,9 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
                           const char *desc_target);
 static int     check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * 
entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * 
entry);
 
 
 /* ----------------
@@ -1108,7 +1113,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
                conn->next_result = conn->result;
                conn->result = res;
                /* And mark the result ready to return */
-               conn->asyncStatus = PGASYNC_READY;
+               conn->asyncStatus = PGASYNC_READY_MORE;
        }
 
        return 1;
@@ -1131,6 +1136,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("cannot 
PQsendQuery in batch mode, use PQsendQueryParams\n"));
+               return false;
+       }
+
        if (!PQsendQueryStart(conn))
                return 0;
 
@@ -1229,6 +1241,10 @@ PQsendPrepare(PGconn *conn,
                          const char *stmtName, const char *query,
                          int nParams, const Oid *paramTypes)
 {
+       PGcommandQueueEntry *pipeCmd = NULL;
+       char      **last_query;
+       PGQueryClass *queryclass;
+
        if (!PQsendQueryStart(conn))
                return 0;
 
@@ -1287,18 +1303,34 @@ PQsendPrepare(PGconn *conn,
                goto sendFailed;
 
        /* construct the Sync message */
-       if (pqPutMsgStart('S', false, conn) < 0 ||
-               pqPutMsgEnd(conn) < 0)
-               goto sendFailed;
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               if (pqPutMsgStart('S', false, conn) < 0 ||
+                       pqPutMsgEnd(conn) < 0)
+                       goto sendFailed;
+
+               last_query = &conn->last_query;
+               queryclass = &conn->queryclass;
+       }
+       else
+       {
+               pipeCmd = PQmakePipelinedCommand(conn);
+
+               if (pipeCmd == NULL)
+                       return 0;                       /* error msg already 
set */
+
+               last_query = &pipeCmd->query;
+               queryclass = &pipeCmd->queryclass;
+       }
 
        /* remember we are doing just a Parse */
-       conn->queryclass = PGQUERY_PREPARE;
+       *queryclass = PGQUERY_PREPARE;
 
        /* and remember the query text too, if possible */
        /* if insufficient memory, last_query just winds up NULL */
-       if (conn->last_query)
-               free(conn->last_query);
-       conn->last_query = strdup(query);
+       if (*last_query)
+               free(*last_query);
+       *last_query = strdup(query);
 
        /*
         * Give the data a push.  In nonblock mode, don't complain if we're 
unable
@@ -1308,10 +1340,14 @@ PQsendPrepare(PGconn *conn,
                goto sendFailed;
 
        /* OK, it's launched! */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               PQappendPipelinedCommand(conn, pipeCmd);
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        return 1;
 
 sendFailed:
+       PQrecyclePipelinedCommand(conn, pipeCmd);
        pqHandleSendFailure(conn);
        return 0;
 }
@@ -1359,7 +1395,80 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQmakePipelinedCommand
+ *     Get a new command queue entry, allocating it if required. Doesn't add 
it to
+ *     the tail of the queue yet, use PQappendPipelinedCommand once the 
command has
+ *     been written for that. If a command fails once it's called this, it 
should
+ *     use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry *
+PQmakePipelinedCommand(PGconn *conn)
+{
+       PGcommandQueueEntry *entry;
+
+       if (conn->cmd_queue_recycle == NULL)
+       {
+               entry = (PGcommandQueueEntry *) 
malloc(sizeof(PGcommandQueueEntry));
+               if (entry == NULL)
+               {
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         libpq_gettext("out of 
memory\n"));
+                       return NULL;
+               }
+       }
+       else
+       {
+               entry = conn->cmd_queue_recycle;
+               conn->cmd_queue_recycle = entry->next;
+       }
+       entry->next = NULL;
+       entry->query = NULL;
+
+       return entry;
+}
+
+/*
+ * PQappendPipelinedCommand
+ *     Append a precreated command queue entry to the queue after it's been
+ *     sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry)
+{
+       if (conn->cmd_queue_head == NULL)
+               conn->cmd_queue_head = entry;
+       else
+               conn->cmd_queue_tail->next = entry;
+       conn->cmd_queue_tail = entry;
+}
+
+/*
+ * PQrecyclePipelinedCommand
+ *     Push a command queue entry onto the freelist. It must be a dangling 
entry
+ *     with null next pointer and not referenced by any other entry's next 
pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry)
+{
+       if (entry == NULL)
+               return;
+       if (entry->next != NULL)
+       {
+               fprintf(stderr, libpq_gettext("tried to recycle non-dangling 
command queue entry"));
+               abort();
+       }
+       if (entry->query)
+               free(entry->query);
+
+       entry->next = conn->cmd_queue_recycle;
+       conn->cmd_queue_recycle = entry;
+}
+
+/*
+ * PQsendQueryStart
+ *     Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn)
@@ -1377,20 +1486,60 @@ PQsendQueryStart(PGconn *conn)
                                                  libpq_gettext("no connection 
to the server\n"));
                return false;
        }
-       /* Can't send while already busy, either. */
-       if (conn->asyncStatus != PGASYNC_IDLE)
+
+       /* Can't send while already busy, either, unless enqueuing for later */
+       if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == 
PQBATCH_MODE_OFF)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                  libpq_gettext("another command is already in 
progress\n"));
                return false;
        }
 
-       /* initialize async result-accumulation state */
-       pqClearAsyncResult(conn);
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               /*
+                * When enqueuing a message we don't change much of the 
connection
+                * state since it's already in use for the current command. The
+                * connection state will get updated when 
PQbatchQueueProcess(...)
+                * advances to start processing the queued message.
+                *
+                * Just make sure we can safely enqueue given the current 
connection
+                * state. We can enqueue behind another queue item, or behind a
+                * non-queue command (one that sends its own sync), but we can't
+                * enqueue if the connection is in a copy state.
+                */
+               switch (conn->asyncStatus)
+               {
+                       case PGASYNC_QUEUED:
+                       case PGASYNC_READY:
+                       case PGASYNC_READY_MORE:
+                       case PGASYNC_BUSY:
+                               /* ok to queue */
+                               break;
+                       case PGASYNC_COPY_IN:
+                       case PGASYNC_COPY_OUT:
+                       case PGASYNC_COPY_BOTH:
+                               printfPQExpBuffer(&conn->errorMessage,
+                                          libpq_gettext("cannot queue commands 
during COPY\n"));
+                               return false;
+                               break;
+                       case PGASYNC_IDLE:
+                               printfPQExpBuffer(&conn->errorMessage,
+                                                                 
libpq_gettext_noop("internal error, idle state in batch mode"));
+                               break;
+               }
+       }
+       else
+       {
+               /* This command's results will come in immediately.
+                * Initialize async result-accumulation state
+                */
+               pqClearAsyncResult(conn);
 
-       /* reset single-row processing mode */
-       conn->singleRowMode = false;
+               /* reset single-row processing mode */
+               conn->singleRowMode = false;
 
+       }
        /* ready to send command message */
        return true;
 }
@@ -1414,6 +1563,10 @@ PQsendQueryGuts(PGconn *conn,
                                int resultFormat)
 {
        int                     i;
+       PGcommandQueueEntry *pipeCmd = NULL;
+       char      **last_query;
+       PGQueryClass *queryclass;
+
 
        /* This isn't gonna work on a 2.0 server */
        if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1423,6 +1576,23 @@ PQsendQueryGuts(PGconn *conn,
                return 0;
        }
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               pipeCmd = PQmakePipelinedCommand(conn);
+
+               if (pipeCmd == NULL)
+                       return 0;                       /* error msg already 
set */
+
+               last_query = &pipeCmd->query;
+               queryclass = &pipeCmd->queryclass;
+       }
+       else
+       {
+               last_query = &conn->last_query;
+               queryclass = &conn->queryclass;
+       }
+
+
        /*
         * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
         * using specified statement name and the unnamed portal.
@@ -1535,22 +1705,25 @@ PQsendQueryGuts(PGconn *conn,
                pqPutMsgEnd(conn) < 0)
                goto sendFailed;
 
-       /* construct the Sync message */
-       if (pqPutMsgStart('S', false, conn) < 0 ||
-               pqPutMsgEnd(conn) < 0)
-               goto sendFailed;
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               /* construct the Sync message */
+               if (pqPutMsgStart('S', false, conn) < 0 ||
+                       pqPutMsgEnd(conn) < 0)
+                       goto sendFailed;
+       }
 
        /* remember we are using extended query protocol */
-       conn->queryclass = PGQUERY_EXTENDED;
+       *queryclass = PGQUERY_EXTENDED;
 
        /* and remember the query text too, if possible */
        /* if insufficient memory, last_query just winds up NULL */
-       if (conn->last_query)
-               free(conn->last_query);
+       if (*last_query)
+               free(*last_query);
        if (command)
-               conn->last_query = strdup(command);
+               *last_query = strdup(command);
        else
-               conn->last_query = NULL;
+               *last_query = NULL;
 
        /*
         * Give the data a push.  In nonblock mode, don't complain if we're 
unable
@@ -1560,10 +1733,14 @@ PQsendQueryGuts(PGconn *conn,
                goto sendFailed;
 
        /* OK, it's launched! */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               PQappendPipelinedCommand(conn, pipeCmd);
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        return 1;
 
 sendFailed:
+       PQrecyclePipelinedCommand(conn, pipeCmd);
        pqHandleSendFailure(conn);
        return 0;
 }
@@ -1690,6 +1867,302 @@ PQisBusy(PGconn *conn)
        return conn->asyncStatus == PGASYNC_BUSY;
 }
 
+/*
+ * PQbatchQueueCount
+ *     Return number of queries currently pending in batch mode
+ */
+int
+PQbatchQueueCount(PGconn *conn)
+{
+       int                     count = 0;
+       PGcommandQueueEntry *entry;
+
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               return 0;
+
+       entry = conn->cmd_queue_head;
+       while (entry != NULL)
+       {
+               ++count;
+               entry = entry->next;
+       }
+       return count;
+}
+
+/*
+ * PQbatchStatus
+ *     Returns current batch mode status
+ */
+int
+PQbatchStatus(PGconn *conn)
+{
+       if (!conn)
+               return FALSE;
+
+       return conn->batch_status;
+}
+
+/*
+ * PQbatchBegin
+ *     Put an idle connection in batch mode. Commands submitted after this
+ *     can be pipelined on the connection, there's no requirement to wait for
+ *     one to finish before the next is dispatched.
+ *
+ *     Queuing of new query or syncing during COPY is not allowed.
+ *
+ *     A set of commands is terminated by a PQbatchQueueSync. Multiple sets of 
batched
+ *     commands may be sent while in batch mode. Batch mode can be exited by
+ *     calling PQbatchEnd() once all results are processed.
+ *
+ *     This doesn't actually send anything on the wire, it just puts libpq
+ *     into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+       if (!conn)
+               return false;
+
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               return true;
+
+       if (conn->asyncStatus != PGASYNC_IDLE)
+               return false;
+
+       conn->batch_status = PQBATCH_MODE_ON;
+       conn->asyncStatus = PGASYNC_QUEUED;
+
+       return true;
+}
+
+/*
+ * PQbatchEnd
+ *     End batch mode and return to normal command mode.
+ *
+ *     Has no effect unless the client has processed all results
+ *     from all outstanding batches and the connection is idle.
+ *
+ *     Returns true if batch mode ended.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+       if (!conn)
+               return false;
+
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+               return true;
+
+       switch (conn->asyncStatus)
+       {
+               case PGASYNC_IDLE:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext_noop("internal error, IDLE in 
batch mode"));
+                       break;
+               case PGASYNC_COPY_IN:
+               case PGASYNC_COPY_OUT:
+               case PGASYNC_COPY_BOTH:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext_noop("internal error, COPY in 
batch mode"));
+                       break;
+               case PGASYNC_READY:
+               case PGASYNC_READY_MORE:
+               case PGASYNC_BUSY:
+                       /* can't end batch while busy */
+                       return false;
+               case PGASYNC_QUEUED:
+                       break;
+       }
+
+       /* still work to process */
+       if (conn->cmd_queue_head != NULL)
+               return false;
+
+       conn->batch_status = PQBATCH_MODE_OFF;
+       conn->asyncStatus = PGASYNC_IDLE;
+
+       return true;
+}
+
+/*
+ * PQbatchQueueSync
+ *     End a batch submission by sending a protocol sync. The connection will
+ *     remain in batch mode and unavailable for new non-batch commands until 
all
+ *     results from the batch are processed by the client.
+ *
+ *     It's legal to start submitting another batch immediately, without 
waiting
+ *     for the results of the current batch. There's no need to end batch mode
+ *     and start it again.
+ *
+ *     If a command in a batch fails, every subsequent command up to and 
including
+ *     the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED 
state. If the
+ *     whole batch is processed without error, a PGresult with PGRES_BATCH_END 
is
+ *     produced.
+ */
+int
+PQbatchSyncQueue(PGconn *conn)
+{
+       PGcommandQueueEntry *entry;
+
+       if (!conn)
+               return false;
+
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+               return false;
+
+       switch (conn->asyncStatus)
+       {
+               case PGASYNC_IDLE:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext_noop("internal error, IDLE in 
batch mode"));
+                       break;
+               case PGASYNC_COPY_IN:
+               case PGASYNC_COPY_OUT:
+               case PGASYNC_COPY_BOTH:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext_noop("internal error, COPY in 
batch mode"));
+                       break;
+               case PGASYNC_READY:
+               case PGASYNC_READY_MORE:
+               case PGASYNC_BUSY:
+               case PGASYNC_QUEUED:
+                       /* can send sync to end this batch of cmds */
+                       break;
+       }
+
+       entry = PQmakePipelinedCommand(conn);
+       if (entry == NULL)
+               return false;                   /* error msg already set */
+
+       entry->queryclass = PGQUERY_SYNC;
+       entry->query = NULL;
+
+       /* construct the Sync message */
+       if (pqPutMsgStart('S', false, conn) < 0 ||
+               pqPutMsgEnd(conn) < 0)
+               goto sendFailed;
+
+       PQappendPipelinedCommand(conn, entry);
+
+       /*
+        * Give the data a push.  In nonblock mode, don't complain if we're 
unable
+        * to send it all; PQgetResult() will do any additional flushing needed.
+        */
+       if (PQflush(conn) < 0)
+               goto sendFailed;
+
+       return true;
+
+sendFailed:
+       PQrecyclePipelinedCommand(conn, entry);
+       pqHandleSendFailure(conn);
+       return false;
+}
+
+/*
+ * PQbatchQueueProcess
+ *      In batch mode, start processing the next query in the queue.
+ *
+ * Returns true if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns false if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+int
+PQbatchProcessQueue(PGconn *conn)
+{
+       PGcommandQueueEntry *next_query;
+
+       if (!conn)
+               return false;
+
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+               return false;
+
+       switch (conn->asyncStatus)
+       {
+               case PGASYNC_COPY_IN:
+               case PGASYNC_COPY_OUT:
+               case PGASYNC_COPY_BOTH:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext_noop("internal error, COPY in 
batch mode"));
+                       break;
+               case PGASYNC_READY:
+               case PGASYNC_READY_MORE:
+               case PGASYNC_BUSY:
+                       /* client still has to process current query or results 
*/
+                       return false;
+                       break;
+               case PGASYNC_IDLE:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext_noop("internal error, IDLE in 
batch mode"));
+                       break;
+               case PGASYNC_QUEUED:
+                       /* next query please */
+                       break;
+       }
+
+       if (conn->cmd_queue_head == NULL)
+       {
+               /*
+                * In batch mode but nothing left on the queue; caller can 
submit more
+                * work or PQbatchEnd() now.
+                */
+               return false;
+       }
+
+       /*
+        * Pop the next query from the queue and set up the connection state as 
if
+        * it'd just been dispatched from a non-batched call
+        */
+       next_query = conn->cmd_queue_head;
+       conn->cmd_queue_head = next_query->next;
+       next_query->next = NULL;
+
+       /* This command's results will come in immediately.
+        * Initialize async result-accumulation state */
+       pqClearAsyncResult(conn);
+
+       /* reset single-row processing mode */
+       conn->singleRowMode = false;
+
+
+       conn->last_query = next_query->query;
+       next_query->query = NULL;
+       conn->queryclass = next_query->queryclass;
+
+       PQrecyclePipelinedCommand(conn, next_query);
+
+       if (conn->batch_status == PQBATCH_MODE_ABORTED && conn->queryclass != 
PGQUERY_SYNC)
+       {
+               /*
+                * In an aborted batch we don't get anything from the server 
for each
+                * result; we're just discarding input until we get to the next 
sync
+                * from the server. The client needs to know its queries got 
aborted
+                * so we create a fake PGresult to return immediately from
+                * PQgetResult.
+                */
+               conn->result = PQmakeEmptyPGresult(conn,
+                                                                               
   PGRES_BATCH_ABORTED);
+               if (!conn->result)
+               {
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         libpq_gettext("out of 
memory"));
+                       pqSaveErrorResult(conn);
+                       return false;
+               }
+               conn->asyncStatus = PGASYNC_READY;
+       }
+       else
+       {
+               /* allow parsing to continue */
+               conn->asyncStatus = PGASYNC_BUSY;
+       }
+
+       return true;
+}
+
 
 /*
  * PQgetResult
@@ -1749,10 +2222,32 @@ PQgetResult(PGconn *conn)
        switch (conn->asyncStatus)
        {
                case PGASYNC_IDLE:
+               case PGASYNC_QUEUED:
                        res = NULL;                     /* query is complete */
                        break;
                case PGASYNC_READY:
                        res = pqPrepareAsyncResult(conn);
+                       if (conn->batch_status != PQBATCH_MODE_OFF)
+                       {
+                               /*
+                                * batched queries aren't followed by a Sync to 
put us back in
+                                * PGASYNC_IDLE state, and when we do get a 
sync we could
+                                * still have another batch coming after this 
one.
+                                *
+                                * The connection isn't idle since we can't 
submit new
+                                * nonbatched commands. It isn't also busy 
since the current
+                                * command is done and we need to process a new 
one.
+                                */
+                               conn->asyncStatus = PGASYNC_QUEUED;
+                       }
+                       else
+                       {
+                               /* Set the state back to BUSY, allowing parsing 
to proceed. */
+                               conn->asyncStatus = PGASYNC_BUSY;
+                       }
+                       break;
+               case PGASYNC_READY_MORE:
+                       res = pqPrepareAsyncResult(conn);
                        /* Set the state back to BUSY, allowing parsing to 
proceed. */
                        conn->asyncStatus = PGASYNC_BUSY;
                        break;
@@ -1932,6 +2427,13 @@ PQexecStart(PGconn *conn)
        if (!conn)
                return false;
 
+       if (conn->asyncStatus == PGASYNC_QUEUED || conn->batch_status != 
PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("Synchronous 
command execution functions are not allowed in batch mode\n"));
+               return false;
+       }
+
        /*
         * Silently discard any prior query result that application didn't eat.
         * This is probably poor design, but it's here for backward 
compatibility.
@@ -2126,6 +2628,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+       PGcommandQueueEntry *pipeCmd = NULL;
+       PGQueryClass *queryclass;
+
        /* Treat null desc_target as empty string */
        if (!desc_target)
                desc_target = "";
@@ -2141,6 +2646,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char 
*desc_target)
                return 0;
        }
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               pipeCmd = PQmakePipelinedCommand(conn);
+
+               if (pipeCmd == NULL)
+                       return 0;                       /* error msg already 
set */
+
+               queryclass = &pipeCmd->queryclass;
+       }
+       else
+       {
+               queryclass = &conn->queryclass;
+       }
+
        /* construct the Describe message */
        if (pqPutMsgStart('D', false, conn) < 0 ||
                pqPutc(desc_type, conn) < 0 ||
@@ -2149,15 +2668,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char 
*desc_target)
                goto sendFailed;
 
        /* construct the Sync message */
-       if (pqPutMsgStart('S', false, conn) < 0 ||
-               pqPutMsgEnd(conn) < 0)
-               goto sendFailed;
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               if (pqPutMsgStart('S', false, conn) < 0 ||
+                       pqPutMsgEnd(conn) < 0)
+                       goto sendFailed;
+       }
 
        /* remember we are doing a Describe */
-       conn->queryclass = PGQUERY_DESCRIBE;
+       *queryclass = PGQUERY_DESCRIBE;
 
        /* reset last-query string (not relevant now) */
-       if (conn->last_query)
+       if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
        {
                free(conn->last_query);
                conn->last_query = NULL;
@@ -2171,10 +2693,14 @@ PQsendDescribe(PGconn *conn, char desc_type, const char 
*desc_target)
                goto sendFailed;
 
        /* OK, it's launched! */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               PQappendPipelinedCommand(conn, pipeCmd);
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        return 1;
 
 sendFailed:
+       PQrecyclePipelinedCommand(conn, pipeCmd);
        pqHandleSendFailure(conn);
        return 0;
 }
@@ -2569,6 +3095,13 @@ PQfn(PGconn *conn,
        /* clear the error string */
        resetPQExpBuffer(&conn->errorMessage);
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                               libpq_gettext("Synchronous 
command execution functions are not allowed in batch mode\n"));
+               return NULL;
+       }
+
        if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE 
||
                conn->result != NULL)
        {
diff --git a/src/interfaces/libpq/fe-protocol2.c 
b/src/interfaces/libpq/fe-protocol2.c
index 3b0500f..c01f1a2 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -412,6 +412,12 @@ pqParseInput2(PGconn *conn)
 {
        char            id;
 
+       if (conn->asyncStatus == PGASYNC_QUEUED || conn->batch_status != 
PQBATCH_MODE_OFF)
+       {
+               fprintf(stderr, "internal error, attempt to read v2 protocol in 
batch mode");
+               abort();
+       }
+
        /*
         * Loop to parse successive complete messages available in the buffer.
         */
diff --git a/src/interfaces/libpq/fe-protocol3.c 
b/src/interfaces/libpq/fe-protocol3.c
index 53776e2..e24d7ce 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -220,10 +220,18 @@ pqParseInput3(PGconn *conn)
                                                return;
                                        conn->asyncStatus = PGASYNC_READY;
                                        break;
-                               case 'Z':               /* backend is ready for 
new query */
+                               case 'Z':               /* sync response, 
backend is ready for new query */
                                        if (getReadyForQuery(conn))
                                                return;
-                                       conn->asyncStatus = PGASYNC_IDLE;
+                                       if (conn->batch_status != 
PQBATCH_MODE_OFF)
+                                       {
+                                               conn->batch_status = 
PQBATCH_MODE_ON;
+                                               conn->result = 
PQmakeEmptyPGresult(conn,
+                                                               
PGRES_BATCH_END);
+                                               conn->asyncStatus = 
PGASYNC_READY;
+                                       }
+                                       else
+                                               conn->asyncStatus = 
PGASYNC_IDLE;
                                        break;
                                case 'I':               /* empty query */
                                        if (conn->result == NULL)
@@ -880,6 +888,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
        PQExpBufferData workBuf;
        char            id;
 
+       if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+               conn->batch_status = PQBATCH_MODE_ABORTED;
+
        /*
         * Since the fields might be pretty long, we create a temporary
         * PQExpBuffer rather than using conn->workBuffer.  workBuffer is 
intended
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index e7496c5..321fc60 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -95,7 +95,10 @@ typedef enum
        PGRES_NONFATAL_ERROR,           /* notice or warning message */
        PGRES_FATAL_ERROR,                      /* query failed */
        PGRES_COPY_BOTH,                        /* Copy In/Out data transfer in 
progress */
-       PGRES_SINGLE_TUPLE                      /* single tuple from larger 
resultset */
+       PGRES_SINGLE_TUPLE,                     /* single tuple from larger 
resultset */
+       PGRES_BATCH_END,                        /* end of a batch of commands */
+       PGRES_BATCH_ABORTED,            /* Command didn't run because of an 
abort
+                                                                * earlier in a 
batch */
 } ExecStatusType;
 
 typedef enum
@@ -134,6 +137,17 @@ typedef enum
        PQPING_NO_ATTEMPT                       /* connection not attempted 
(bad params) */
 } PGPing;
 
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+       PQBATCH_MODE_OFF,
+       PQBATCH_MODE_ON,
+       PQBATCH_MODE_ABORTED
+}      PQBatchStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -425,6 +439,14 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int     PQisBusy(PGconn *conn);
 extern int     PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int     PQbatchStatus(PGconn *conn);
+extern int     PQbatchQueueCount(PGconn *conn);
+extern int     PQenterBatchMode(PGconn *conn);
+extern int     PQexitBatchMode(PGconn *conn);
+extern int     PQbatchSyncQueue(PGconn *conn);
+extern int     PQbatchProcessQueue(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 335568b..619f5c0 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -215,10 +215,15 @@ typedef enum
 {
        PGASYNC_IDLE,                           /* nothing's happening, dude */
        PGASYNC_BUSY,                           /* query in progress */
-       PGASYNC_READY,                          /* result ready for PQgetResult 
*/
+       PGASYNC_READY,                          /* query done, waiting for 
client to fetch
+                                                                * result */
+       PGASYNC_READY_MORE,                     /* query done, waiting for 
client to fetch
+                                                                * result, More 
results expected from this
+                                                                * query */
        PGASYNC_COPY_IN,                        /* Copy In data transfer in 
progress */
        PGASYNC_COPY_OUT,                       /* Copy Out data transfer in 
progress */
-       PGASYNC_COPY_BOTH                       /* Copy In/Out data transfer in 
progress */
+       PGASYNC_COPY_BOTH,                      /* Copy In/Out data transfer in 
progress */
+       PGASYNC_QUEUED                          /* Current query done, more in 
queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -227,7 +232,8 @@ typedef enum
        PGQUERY_SIMPLE,                         /* simple Query protocol 
(PQexec) */
        PGQUERY_EXTENDED,                       /* full Extended protocol 
(PQexecParams) */
        PGQUERY_PREPARE,                        /* Parse only (PQprepare) */
-       PGQUERY_DESCRIBE                        /* Describe Statement or Portal 
*/
+       PGQUERY_DESCRIBE,                       /* Describe Statement or Portal 
*/
+       PGQUERY_SYNC                            /* A protocol sync to end a 
batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the PQSetenv state machine */
@@ -297,6 +303,22 @@ typedef enum pg_conn_host_type
        CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+       PGQueryClass queryclass;        /* Query type; PGQUERY_SYNC for sync 
msg */
+       char       *query;                      /* SQL command, or NULL if 
unknown */
+       struct pgCommandQueueEntry *next;
+}      PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about one of possibly several hosts
  * mentioned in the connection string.  Derived by splitting the pghost
@@ -384,6 +406,7 @@ struct pg_conn
        bool            options_valid;  /* true if OK to attempt connection */
        bool            nonblocking;    /* whether this connection is using 
nonblock
                                                                 * sending 
semantics */
+       PQBatchStatus batch_status; /* Batch(pipelining) mode status of 
connection */
        bool            singleRowMode;  /* return current query result 
row-by-row? */
        char            copy_is_binary; /* 1 = copy binary, 0 = copy text */
        int                     copy_already_done;              /* # bytes 
already returned in COPY
@@ -396,6 +419,16 @@ struct pg_conn
        int                     whichhost;              /* host we're currently 
considering */
        pg_conn_host *connhost;         /* details about each possible host */
 
+       /*
+        * The command queue
+        *
+        * head is the next pending cmd, tail is where we append new commands.
+        * Freed entries for recycling go on the recycle linked list.
+        */
+       PGcommandQueueEntry *cmd_queue_head;
+       PGcommandQueueEntry *cmd_queue_tail;
+       PGcommandQueueEntry *cmd_queue_recycle;
+
        /* Connection data */
        pgsocket        sock;                   /* FD for socket, 
PGINVALID_SOCKET if
                                                                 * unconnected 
*/
@@ -684,6 +717,8 @@ extern char *libpq_ngettext(const char *msgid, const char 
*msgid_plural, unsigne
 #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p))
 #endif
 
+#define libpq_gettext_noop(x) (x)
+
 /*
  * These macros are needed to let error-handling code be portable between
  * Unix and Windows.  (ugh)
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 14aa587..016455e 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -269,7 +269,8 @@ typedef enum
         *
         * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
         * command, the command is sent to the server, and we move to
-        * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is 
set,
+        * CSTATE_WAIT_RESULT state unless in batch mode.
+        * On a \sleep meta-command, the timer is set,
         * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
         * meta-commands are executed immediately.
         *
@@ -1882,11 +1883,24 @@ sendCommand(CState *st, Command *command)
                                if (commands[j]->type != SQL_COMMAND)
                                        continue;
                                preparedStatementName(name, st->use_file, j);
-                               res = PQprepare(st->con, name,
-                                                 commands[j]->argv[0], 
commands[j]->argc - 1, NULL);
-                               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                                       fprintf(stderr, "%s", 
PQerrorMessage(st->con));
-                               PQclear(res);
+                               if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF)
+                               {
+                                       res = PQprepare(st->con, name,
+                                                                       
commands[j]->argv[0], commands[j]->argc - 1, NULL);
+                                       if (PQresultStatus(res) != 
PGRES_COMMAND_OK)
+                                               fprintf(stderr, "%s", 
PQerrorMessage(st->con));
+                                       PQclear(res);
+                               }
+                               else
+                               {
+                                       /*
+                                        * In batch mode, we use asynchronous 
functions. If a server-side
+                                        * error occurs, it will be processed 
later among the other results.
+                                        */
+                                       if (!PQsendPrepare(st->con, name,
+                                                                          
commands[j]->argv[0], commands[j]->argc - 1, NULL))
+                                               fprintf(stderr, "%s", 
PQerrorMessage(st->con));
+                               }
                        }
                        st->prepared[st->use_file] = true;
                }
@@ -2165,7 +2179,13 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                                return;
                                        }
                                        else
-                                               st->state = CSTATE_WAIT_RESULT;
+                                       {
+                                               /* Wait for results, unless in 
batch mode */
+                                               if (PQbatchStatus(st->con) == 
PQBATCH_MODE_OFF)
+                                                       st->state = 
CSTATE_WAIT_RESULT;
+                                               else
+                                                       st->state = 
CSTATE_END_COMMAND;
+                                       }
                                }
                                else if (command->type == META_COMMAND)
                                {
@@ -2207,7 +2227,51 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        }
                                        else
                                        {
-                                               if (pg_strcasecmp(argv[0], 
"set") == 0)
+                                               if (pg_strcasecmp(argv[0], 
"beginbatch") == 0)
+                                               {
+                                                       /*
+                                                        * In batch mode, we 
use a workflow based on libpq batch
+                                                        * functions.
+                                                        */
+                                                       if (querymode == 
QUERY_SIMPLE)
+                                                       {
+                                                               
commandFailed(st, "cannot use batch mode with the simple query protocol");
+                                                               st->state = 
CSTATE_ABORTED;
+                                                               break;
+                                                       }
+
+                                                       if 
(PQbatchStatus(st->con) != PQBATCH_MODE_OFF)
+                                                       {
+                                                               
commandFailed(st, "already in batch mode");
+                                                               break;
+                                                       }
+                                                       if 
(PQenterBatchMode(st->con) == 0)
+                                                       {
+                                                               
commandFailed(st, "failed to start a batch");
+                                                               break;
+                                                       }
+                                               }
+                                               else if (pg_strcasecmp(argv[0], 
"endbatch") == 0)
+                                               {
+                                                       if 
(PQbatchStatus(st->con) != PQBATCH_MODE_ON)
+                                                       {
+                                                               
commandFailed(st, "not in batch mode");
+                                                               break;
+                                                       }
+                                                       if 
(!PQbatchSyncQueue(st->con))
+                                                       {
+                                                               
commandFailed(st, "failed to end the batch");
+                                                               st->state = 
CSTATE_ABORTED;
+                                                               break;
+                                                       }
+                                                       if 
(PQexitBatchMode(st->con) == 0)
+                                                       {
+                                                               /* collect 
pending results before getting out of batch mode */
+                                                               st->state = 
CSTATE_WAIT_RESULT;
+                                                               break;
+                                                       }
+                                               }
+                                               else if (pg_strcasecmp(argv[0], 
"set") == 0)
                                                {
                                                        PgBenchExpr *expr = 
command->expr;
                                                        PgBenchValue result;
@@ -2279,6 +2343,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                }
                                break;
 
+
                                /*
                                 * Wait for the current SQL command to complete
                                 */
@@ -2295,6 +2360,13 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                if (PQisBusy(st->con))
                                        return;         /* don't have the whole 
result yet */
 
+                               if (PQbatchStatus(st->con) == PQBATCH_MODE_ON &&
+                                       !PQbatchProcessQueue(st->con))
+                               {
+                                       /* no complete result yet in batch 
mode*/
+                                       return;
+                               }
+
                                /*
                                 * Read and discard the query result;
                                 */
@@ -2307,7 +2379,22 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                                /* OK */
                                                PQclear(res);
                                                discard_response(st);
-                                               st->state = CSTATE_END_COMMAND;
+                                               /*
+                                                * In non-batch mode, only one 
result per command is expected.
+                                                * In batch mode, keep waiting 
for results until getting
+                                                * PGRES_BATCH_END.
+                                                */
+                                               if (PQbatchStatus(st->con) != 
PQBATCH_MODE_ON)
+                                                       st->state = 
CSTATE_END_COMMAND;
+                                               break;
+                                       case PGRES_BATCH_END:
+                                               if (PQexitBatchMode(st->con) == 
1)
+                                               {
+                                                       /* all results 
collected, exit out of command and batch mode */
+                                                       st->state = 
CSTATE_END_COMMAND;
+                                               }
+                                               else
+                                                       fprintf(stderr, "client 
%d to exit batch mode", st->id);
                                                break;
                                        default:
                                                commandFailed(st, 
PQerrorMessage(st->con));
@@ -3173,6 +3260,13 @@ process_backslash_command(PsqlScanState sstate, const 
char *source)
                        syntax_error(source, lineno, my_command->line, 
my_command->argv[0],
                                                 "missing command", NULL, -1);
        }
+       else if (pg_strcasecmp(my_command->argv[0], "beginbatch") == 0 ||
+                        pg_strcasecmp(my_command->argv[0], "endbatch") == 0 )
+       {
+               if (my_command->argc > 1)
+                       syntax_error(source, lineno, my_command->line, 
my_command->argv[0],
+                                                "unexpected argument", NULL, 
-1);
+       }
        else
        {
                syntax_error(source, lineno, my_command->line, 
my_command->argv[0],
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to