Hi all

Following on from the foreign table batch inserts thread[1], here's a patch
to add support for pipelining queries into asynchronous batches in libpq.

Attached, and also available at
https://github.com/2ndQuadrant/postgres/tree/dev/libpq-async-batch (subject
to rebasing and force pushes).

It's cleaned up over the draft I posted on that thread and has error
recovery implemented. I've written and included the SGML docs for it. The
test program is now pretty comprehensive, more so than for anything else in
libpq anyway. I'll submit it to the next CF as a 9.7/10.0 candidate.

I'm measuring 300x (not %) performance improvements doing batches on
servers over the Internet, so this seems pretty worthwhile. It turned out
to be way less invasive than I expected too.

(I intentionally didn't add any way for clients to annotate each work-item
in a batch with their own private data. I think that'd be really useful and
would make implementing clients easier, but should be a separate patch).

This should be very useful for optimising FDWs, Postgres-XC, etc.


[1]
http://www.postgresql.org/message-id/CAMsr+YFgDUiJ37DEfPRk8WDBuZ58psdAYJd8iNFSaGxtw=w...@mail.gmail.com

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 711a6951f57d84309bd2d7477a145ee3412b7967 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Fri, 20 May 2016 12:45:18 +0800
Subject: [PATCH] Pipelining (batch) support for libpq

Allow libpq clients to avoid excessive round trips by pipelining multiple
commands into batches. A sync is only sent at the end of a batch. Commands
in a batch succeed or fail together.

Includes a test program in src/test/examples/testlibpqbatch.c
---
 doc/src/sgml/libpq.sgml             |  478 +++++++++++++
 src/interfaces/libpq/exports.txt    |    7 +
 src/interfaces/libpq/fe-connect.c   |   16 +
 src/interfaces/libpq/fe-exec.c      |  572 +++++++++++++++-
 src/interfaces/libpq/fe-protocol2.c |    6 +
 src/interfaces/libpq/fe-protocol3.c |   17 +-
 src/interfaces/libpq/libpq-fe.h     |   13 +-
 src/interfaces/libpq/libpq-int.h    |   37 +-
 src/test/examples/.gitignore        |    1 +
 src/test/examples/Makefile          |    2 +-
 src/test/examples/testlibpqbatch.c  | 1254 +++++++++++++++++++++++++++++++++++
 11 files changed, 2360 insertions(+), 43 deletions(-)
 create mode 100644 src/test/examples/testlibpqbatch.c

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 3829a14..d6e896f 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4568,6 +4568,484 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch mode and query pipelining</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>pipelining</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> supports queueing up mulitiple queries into
+   a pipeline to be executed as a batch on the server. Batching queries allows
+   applications to avoid a client/server round-trip after each query to get
+   the results before issuing the next query.
+  </para>
+
+  <para>
+   An example of batch use may be found in the source distribution in
+   <filename>src/test/examples/libpqbatch.c</filename>.
+  </para>
+
+  <sect2>
+   <title>When to use batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It somewhat increased client application
+    complexity and extra caution is required to prevent client/server network
+    deadlocks, but can offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e. network latency
+    ("ping time") is high, and when many small operations are being performed in
+    rapid sequence. There is usually less benefit in using batches when each
+    query takes many multiples of the client/server round-trip time to execute.
+    A 100-statement operation run on a server 300ms round-trip-time away would take
+    30 seconds in network latency alone without batching; with batching it may spend
+    as little as 0.3s waiting for results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed into
+    operations on sets or into a
+    <link linkend="libpq-copy"><literal>COPY</literal></link> operation.
+   </para>
+
+   <para>
+    Batching less useful when information from one operation is required by the
+    client before it knows enough to send the next operation. The client must
+    introduce a synchronisation point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+     BEGIN;
+     SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+     -- result: x=2
+     -- client adds 1 to x:
+     UPDATE mytable SET x = 3 WHERE id = 42;
+     COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+     UPDATE mytable SET x = x + 1;
+    </programlisting>
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 9.6, but clients using it can
+     use batches on server versions 8.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+
+  </sect2>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using batch mode</title>
+
+   <para>
+    To issue batches the application must switch
+    <application>libpq</application> into batch mode. Enter batch mode with <link
+    linkend="libpq-pqbeginbatchmode"><function>PQbeginBatchMode(conn)</function></link> or test
+    whether batch mode is active with <link
+    linkend="libpq-pqisinbatchmode"><function>PQisInBatchMode(conn)</function></link>. In batch mode only <link
+    linkend="libpq-async">asynchronous operations</link> are permitted, and
+    <literal>COPY</literal> is not allowed. (The restriction on <literal>COPY</literal> is an implementation
+    limit; the PostgreSQL protocol and server can support batched <literal>COPY</literal>).
+   </para>
+
+   <para>
+    The client uses libpq's asynchronous query functions to dispatch work,
+    marking the end of each batch with <function>PQsendEndBatch</function>.
+    Concurrently, it uses <function>PQgetResult</function> and
+    <function>PQgetNextQuery</function> to get results. It may eventually exit
+    batch mode with <function>PQendBatchMode</function> once all results are
+    processed.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-pqsetnonblocking">non-blocking mode</link>. If used in
+     blocking mode it is possible for a client/server deadlock to occur. The
+     client will block trying to send queries to the server, but the server will
+     block trying to send results from queries it's already processed to the
+     client. This only occurs when the client sends enough queries to fill its
+     output buffer and the server's receive buffer before switching to
+     processing input from the server, but it's hard to predict exactly when
+     that'll happen so it's best to always use non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests
+     using normal asynchronous <application>libpq</application> functions like
+     <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>,
+     etc. The asynchronous requests are followed by a <link
+     linkend="libpq-pqsendendbatch"><function>PQsendEndBatch(conn)</function></link> call to mark
+     the end of the batch. The client <emphasis>does not</emphasis> need to call
+     <function>PQgetResult</function> immediately after dispatching each
+     operation. <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+    
+    <para>
+     Batched operations will be executed by the server in the order the client
+     sends them. The server will send the results in the order the statements
+     executed. The server usually begins executing the batch before all commands
+     in the batch are queued and the end of batch command is sent. If any
+     statement encounters an error the server aborts the current transaction and
+     skips processing the rest of the batch. Query processing resumes after the
+     end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared statement
+     then execute it with later statements in the same batch.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing results</title>
+
+    <para>
+     The client <link linkend="libpq-batch-interleave">interleaves result
+     processing with sending batch queries</link>, or for small batches may
+     process all results after sending the whole batch.
+    </para>
+
+    <para>
+     To get the result of the first batch entry the client must call <link
+     linkend="libpq-pqgetnextquery"><function>PQgetNextQuery</function></link>. It must then call
+     <function>PQgetResult</function> and handle the results until
+     <function>PQgetResult</function> returns null (or would return null if
+     called). The result from the next batch entry may then be retrieved using
+     <function>PQgetNextQuery</function> and the cycle repeated.  The
+     application handles individual statement results as normal.
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal asynchronous
+     processing except that it may contain the new <type>PGresult</type> types
+     <literal>PGRES_BATCH_END</literal> and <literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQsendEndBatch</function> call at the corresponding point in
+     the result stream and at no other time. <literal>PGRES_BATCH_ABORTED</literal>
+     is emitted during error handling; see <link linkend="libpq-batch-errors">
+     error handling</link>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed. The application
+     must keep track of the order in which it sent queries and the expected
+     results. Applications will typically use a state machine or a FIFO queue
+     for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQsendEndBatch</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetNextQuery(...)</function> and
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <para>
+     The client must not assume that work is committed when it
+     <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+     corresponding result is received to confirm the commit is complete.
+     Because errors arrive asynchronously the application needs to be able to
+     restart from the last <emphasis>received</emphasis> committed change and
+     resend work done after that point if something goes wrong.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving result processing and query dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured around
+     a nonblocking I/O loop using a function like <function>select</function>,
+     <function>poll</function>, <function>epoll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work still to
+     be dispatched and a queue of work that has been dispatched but not yet had
+     its results processed. When the socket is writable it should dispatch more
+     work. When the socket is readable it should read results and process them,
+     matching them up to the next entry in its expected results queue. Batches
+     should be scoped to logical units of work, usually (but not always) one
+     transaction per batch. There's no need to exit batch mode and re-enter it
+     between batches or to wait for one batch to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state machine
+     to track sent and received work is in
+     <filename>src/test/examples/testlibpqbatch.c</filename> in the PostgreSQL
+     source distribution.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending batch mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and the end batch
+     result has been consumed the application may return to non-batched mode with
+     <link linkend="libpq-pqendbatchmode"><function>PQendBatchMode(conn)</function></link>.
+    </para>
+   </sect3>
+
+  </sect2>
+
+  <sect2 id="libpq-funcs-batch">
+   <title>Functions associated with batch mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-pqisinbatchmode">
+     <term>
+      <function>PQisInBatchMode</function>
+      <indexterm>
+       <primary>PQisInBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Returns 1 if a <application>libpq</application> connection is in <link
+       linkend="libpq-batch-mode">batch mode</link>, otherwise 0.
+
+<synopsis>
+int PQisInBatchMode(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqbeginbatchmode">
+     <term>
+      <function>PQbeginBatchMode</function>
+      <indexterm>
+       <primary>PQbeginBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode and returns 1 for success. Returns 0 and has no
+      effect if the connection is not currently idle, i.e. it has a result
+      ready, is waiting for more input from the server, etc. This function
+      does not actually send anything to the server, it just changes the
+      <application>libpq</application> connection state.
+
+<synopsis>
+int PQbeginBatchMode(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqendbatchmode">
+     <term>
+      <function>PQendBatchMode</function>
+      <indexterm>
+       <primary>PQendBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to exit batch mode if it is currently in batch mode
+      with an empty queue and no pending results and returns 1 for success.
+      Returns 1 and takes no action if not in batch mode. If the connection has
+      pending batch items in the queue for reading with
+      <function>PQgetNextQuery</function>, the current statement isn't finished
+      processing or there are results pending for collection with
+      <function>PQgetResult</function>, returns 0 and does nothing.
+
+<synopsis>
+int PQendBatchMode(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqsendendbatch">
+     <term>
+      <function>PQsendEndBatch</function>
+      <indexterm>
+       <primary>PQsendEndBatch</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Delimits the end of a set of a batched commands by sending a <link
+      linkend="protocol-flow-ext-query">sync message</link> and flushing
+      the send buffer. The end of a batch serves as 
+      the delimiter of an implicit transaction and
+      an error recovery point; see <link linkend="libpq-batch-errors">
+      error handling</link>.
+
+<synopsis>
+int PQsendEndBatch(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqgetnextquery">
+     <term>
+      <function>PQgetNextQuery</function>
+      <indexterm>
+       <primary>PQgetNextQuery</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes the connection to start processing the next queued query's
+      results. Returns 1 if a new query was popped from the result queue
+      for processing. Returns 0 and has no effect if there are no query results
+      pending, batch mode is not enabled, or if the query currently processed
+      is incomplete or still has pending results. See <link
+      linkend="libpq-batch-results">processing results</link>.
+
+<synopsis>
+int PQgetNextQuery(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqqueriesinbatch">
+     <term>
+      <function>PQqueriesInBatch</function>
+      <indexterm>
+       <primary>PQqueriesInBatch</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns the number of queries still in the queue for this batch, not
+      including any query that's currently having results being processsed.
+      This is the number of times <function>PQgetNextQuery</function> has to be
+      called before the query queue is empty again.
+
+<synopsis>
+int PQqueriesInBatch(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqbatchisaborted">
+     <term>
+      <function>PQbatchIsAborted</function>
+      <indexterm>
+       <primary>PQbatchIsAborted</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Returns 1 if the batch curently being received on a
+       <application>libpq</application> connection in <link
+       linkend="libpq-batch-mode">batch mode</link> is
+       <link linkend="libpq-batch-errors">aborted</link>, 0
+       otherwise. The aborted flag is cleared as soon as the result of the
+       <function>PQsendEndBatch</function> at the end of the aborted batch is
+       processed. Clients don't usually need this function as they can tell
+       that the batch is aborted from <literal>PGRES_BATCH_ABORTED</literal>
+       result codes.
+
+<synopsis>
+int PQbatchIsAborted(PGconn *conn);
+</synopsis>
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+
+  </sect2>
+
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-By-Row</title>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 21dd772..1b0b8c5 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -171,3 +171,10 @@ PQsslAttributeNames       168
 PQsslAttribute            169
 PQsetErrorContextVisibility 170
 PQresultVerboseErrorMessage 171
+PQisInBatchMode           172
+PQqueriesInBatch          173
+PQbeginBatchMode          174
+PQendBatchMode            175
+PQsendEndBatch            176
+PQgetNextQuery            177
+PQbatchIsAborted          178
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 9b2839b..78154e2 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2949,6 +2949,7 @@ static void
 closePGconn(PGconn *conn)
 {
 	PGnotify   *notify;
+	PGcommandQueueEntry *queue;
 	pgParameterStatus *pstatus;
 
 	/*
@@ -2995,6 +2996,21 @@ closePGconn(PGconn *conn)
 		free(prev);
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
+	queue = conn->cmd_queue_head;
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *prev = queue;
+		queue = queue->next;
+		free(prev);
+	}
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+	queue = conn->cmd_queue_recycle;
+	{
+		PGcommandQueueEntry *prev = queue;
+		queue = queue->next;
+		free(prev);
+	}
+	conn->cmd_queue_recycle = NULL;
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
 	{
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 2621767..319c63e 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_BATCH_END",
+	"PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -69,6 +71,9 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
 			   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry* PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
 
 
 /* ----------------
@@ -1090,7 +1095,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1113,6 +1118,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+	if (conn->in_batch)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+				  libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+		return false;
+	}
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1211,9 +1223,29 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char 	   **last_query;
+	PGQueryClass *queryclass;
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
+	if (conn->in_batch)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0; /* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
 	/* check the arguments */
 	if (!stmtName)
 	{
@@ -1269,18 +1301,21 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (!conn->in_batch)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	*queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	if (*last_query)
+		free(*last_query);
+	*last_query = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1290,10 +1325,14 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->in_batch)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -1340,6 +1379,81 @@ PQsendQueryPrepared(PGconn *conn,
 						   resultFormat);
 }
 
+/* Get a new command queue entry, allocating it if required. Doesn't add it to
+ * the tail of the queue yet, use PQappendPipelinedCommand once the command has
+ * been written for that. If a command fails once it's called this, it should
+ * use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry*
+PQmakePipelinedCommand(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry*) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/* Append a precreated command queue entry to the queue after it's been
+ * sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+	conn->cmd_queue_tail = entry;
+}
+
+/* Push a command queue entry onto the freelist. It must be a dangling entry
+ * with null next pointer and not referenced by any other entry's next pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (entry == NULL)
+		return;
+	if (entry->next != NULL)
+	{
+		fprintf(stderr, "tried to recycle non-dangling command queue entry");
+		abort();
+	}
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
+/* Set up for processing a new query's results */
+static void
+PQstartProcessingNewQuery(PGconn *conn)
+{
+	/* initialize async result-accumulation state */
+	conn->result = NULL;
+	conn->next_result = NULL;
+
+	/* reset single-row processing mode */
+	conn->singleRowMode = false;
+}
+
 /*
  * Common startup code for PQsendQuery and sibling routines
  */
@@ -1359,20 +1473,52 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE && !conn->in_batch)
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
-
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+	if (conn->in_batch)
+	{
+		/* When enqueuing a message we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when PQgetNextQuery(...) advances
+		 * to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_QUEUED:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+			case PGASYNC_IDLE:
+				fprintf(stderr, "internal error, idle state in batch mode");
+				abort();
+				break;
+		}
+	}
+	else
+	{
+		/* This command's results will come in immediately */
+		PQstartProcessingNewQuery(conn);
+	}
 
 	/* ready to send command message */
 	return true;
@@ -1397,6 +1543,10 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char 	   **last_query;
+	PGQueryClass *queryclass;
+
 
 	/* This isn't gonna work on a 2.0 server */
 	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1406,6 +1556,23 @@ PQsendQueryGuts(PGconn *conn,
 		return 0;
 	}
 
+	if (conn->in_batch)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0; /* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
+
 	/*
 	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
 	 * using specified statement name and the unnamed portal.
@@ -1518,22 +1685,25 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (!conn->in_batch)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	*queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	if (*last_query)
+		free(*last_query);
 	if (command)
-		conn->last_query = strdup(command);
+		*last_query = strdup(command);
 	else
-		conn->last_query = NULL;
+		*last_query = NULL;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1543,10 +1713,15 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->in_batch)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -1673,6 +1848,282 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY;
 }
 
+/* PQisInBatchMode
+ *   Return true if currently in batch mode
+ */
+int
+PQisInBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return FALSE;
+
+	return conn->in_batch;
+}
+
+/* PQqueriesInBatch
+ *   Return true if there are queries currently pending in batch mode
+ */
+int
+PQqueriesInBatch(PGconn *conn)
+{
+	if (!PQisInBatchMode(conn))
+		return false;
+
+	return conn->cmd_queue_head != NULL;
+}
+
+/* PQbatchIsAborted
+ *   Batch being processed is aborted, results discarded until next sync
+ */
+int
+PQbatchIsAborted(PGconn *conn)
+{
+	if (!PQisInBatchMode(conn))
+		return false;
+
+	return conn->batch_aborted;
+}
+
+/* Put an idle connection in batch mode. Commands submitted after this
+ * can be pipelined on the connection, there's no requirement to wait for
+ * one to finish before the next is dispatched.
+ *
+ * COPY is not permitted in batch mode.
+ *
+ * A set of commands is terminated by a PQsendEndBatch. Multiple sets of batched
+ * commands may be sent while in batch mode. Batch mode can be exited by
+ * calling PQendBatchMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQbeginBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	if (conn->in_batch)
+		return true;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+		return false;
+
+	conn->in_batch = true;
+	conn->batch_aborted = false;
+	conn->asyncStatus = PGASYNC_QUEUED;
+
+	return true;
+}
+
+/* End batch mode and return to normal command mode.
+ *
+ * Has no effect unless the client has processed all results
+ * from all outstanding batches and the connection is idle.
+ *
+ * Returns true if batch mode ended.
+ */
+int
+PQendBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	if (!conn->in_batch)
+		return true;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			fprintf(stderr, "internal error, IDLE in batch mode");
+			abort();
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			fprintf(stderr, "internal error, COPY in batch mode");
+			abort();
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* can't end batch while busy */
+			return false;
+		case PGASYNC_QUEUED:
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+		return false;
+
+	conn->in_batch = false;
+	conn->batch_aborted = false;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	return true;
+}
+
+/* End a batch submission by sending a protocol sync. The connection will
+ * remain in batch mode and unavailable for new non-batch commands until all
+ * results from the batch are processed by the client.
+ *
+ * It's legal to start submitting another batch immediately, without waiting
+ * for the results of the current batch. There's no need to end batch mode
+ * and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and including
+ * the PQsendEndBatch command result gets set to PGRES_BATCH_ABORTED state. If the
+ * whole batch is processed without error, a PGresult with PGRES_BATCH_END is
+ * produced.
+ */
+int
+PQsendEndBatch(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return false;
+
+	if (!conn->in_batch)
+		return false;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			fprintf(stderr, "internal error, IDLE in batch mode");
+			abort();
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			fprintf(stderr, "internal error, COPY in batch mode");
+			abort();
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_QUEUED:
+			/* can send sync to end this batch of cmds */
+			break;
+	}
+
+	entry = PQmakePipelinedCommand(conn);
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	PQappendPipelinedCommand(conn, entry);
+
+	/* Should try to flush immediately if there's room */
+	PQflush(conn);
+
+	return true;
+
+sendFailed:
+	PQrecyclePipelinedCommand(conn, entry);
+	pqHandleSendFailure(conn);
+	return false;
+}
+
+/* PQgetNextQuery
+ *   In batch mode, start processing the next query in the queue.
+ *
+ * Returns true if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns false if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+int
+PQgetNextQuery(PGconn *conn)
+{
+	PGcommandQueueEntry *next_query;
+
+	if (!conn)
+		return FALSE;
+
+	if (!conn->in_batch)
+		return false;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			fprintf(stderr, "internal error, COPY in batch mode");
+			abort();
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return false;
+			break;
+		case PGASYNC_IDLE:
+			fprintf(stderr, "internal error, idle in batch mode");
+			abort();
+			break;
+		case PGASYNC_QUEUED:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/* In batch mode but nothing left on the queue; caller can submit
+		 * more work or PQendBatchMode() now. */
+		return false;
+	}
+
+	/* Pop the next query from the queue and set up the connection state
+	 * as if it'd just been dispatched from a non-batched call */
+	next_query = conn->cmd_queue_head;
+	conn->cmd_queue_head = next_query->next;
+	next_query->next = NULL;
+
+	PQstartProcessingNewQuery(conn);
+
+	conn->last_query = next_query->query;
+	next_query->query = NULL;
+	conn->queryclass = next_query->queryclass;
+
+	PQrecyclePipelinedCommand(conn, next_query);
+
+	if (conn->batch_aborted && conn->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted batch we don't get anything from the server for each
+		 * result; we're just discarding input until we get to the next sync
+		 * from the server. The client needs to know its queries got aborted
+		 * so we create a fake PGresult to return immediately from PQgetResult.
+		 */
+		conn->result = PQmakeEmptyPGresult(conn,
+				PGRES_BATCH_ABORTED);
+		if (!conn->result)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+					libpq_gettext("out of memory"));
+			pqSaveErrorResult(conn);
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+
+		/* Parse any available data */
+		parseInput(conn);
+	}
+
+	return true;
+}
+
 
 /*
  * PQgetResult
@@ -1680,7 +2131,6 @@ PQisBusy(PGconn *conn)
  *	  query work remains or an error has occurred (e.g. out of
  *	  memory).
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1732,10 +2182,31 @@ PQgetResult(PGconn *conn)
 	switch (conn->asyncStatus)
 	{
 		case PGASYNC_IDLE:
+		case PGASYNC_QUEUED:
 			res = NULL;			/* query is complete */
 			break;
 		case PGASYNC_READY:
 			res = pqPrepareAsyncResult(conn);
+			if (conn->in_batch)
+			{
+				/* batched queries aren't followed by a Sync to put us back in
+				 * PGASYNC_IDLE state, and when we do get a sync we could still
+				 * have another batch coming after this one.
+				 *
+				 * The connection isn't idle since we can't submit new
+				 * nonbatched commands. It isn't also busy since the current
+				 * command is done and we need to process a new one.
+				 */
+				conn->asyncStatus = PGASYNC_QUEUED;
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
+			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
 			break;
@@ -1915,6 +2386,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+		  libpq_gettext("cannot PQexec in batch mode\n"));
+		return false;
+	}
+
 	/*
 	 * Silently discard any prior query result that application didn't eat.
 	 * This is probably poor design, but it's here for backward compatibility.
@@ -2109,6 +2587,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	PGQueryClass *queryclass;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2124,6 +2605,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		return 0;
 	}
 
+	if (conn->in_batch)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0; /* error msg already set */
+
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		queryclass = &conn->queryclass;
+	}
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', false, conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2132,15 +2627,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (!conn->in_batch)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
+	*queryclass = PGQUERY_DESCRIBE;
 
 	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
+	if (conn->last_query && !conn->in_batch)
 	{
 		free(conn->last_query);
 		conn->last_query = NULL;
@@ -2154,10 +2652,14 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->in_batch)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index f1b90f3..e824f36 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -412,6 +412,12 @@ pqParseInput2(PGconn *conn)
 {
 	char		id;
 
+	if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch)
+	{
+		fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+		abort();
+	}
+
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 0b8c62f..3d6e6c9 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -220,10 +220,18 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->in_batch)
+					{
+						conn->batch_aborted = false;
+						conn->result = PQmakeEmptyPGresult(conn,
+								PGRES_BATCH_END);
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -305,7 +313,7 @@ pqParseInput3(PGconn *conn)
 						 * parsing until the application accepts the current
 						 * result.
 						 */
-						conn->asyncStatus = PGASYNC_READY;
+						conn->asyncStatus = PGASYNC_READY_MORE;
 						return;
 					}
 					break;
@@ -880,6 +888,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	if (isError && conn->in_batch)
+		conn->batch_aborted = true;
+
 	/*
 	 * Since the fields might be pretty long, we create a temporary
 	 * PQExpBuffer rather than using conn->workBuffer.  workBuffer is intended
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 9ca0756..a05298f 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -91,7 +91,9 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_BATCH_END,			/* end of a batch of commands */
+	PGRES_BATCH_ABORTED,		/* Command didn't run because of an abort earlier in a batch */
 } ExecStatusType;
 
 typedef enum
@@ -421,6 +423,15 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int PQisInBatchMode(PGconn *conn);
+extern int PQbatchIsAborted(PGconn *conn);
+extern int PQqueriesInBatch(PGconn *conn);
+extern int PQbeginBatchMode(PGconn *conn);
+extern int PQendBatchMode(PGconn *conn);
+extern int PQsendEndBatch(PGconn *conn);
+extern int PQgetNextQuery(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1183323..a21e07c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,13 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch result,
+								   More results expected from this query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_QUEUED				/* Current query done, more in queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +232,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* A protocol sync to end a batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the PQSetenv state machine */
@@ -292,6 +296,22 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+  PGQueryClass	queryclass;	/* Query type; PGQUERY_SYNC for sync msg */
+  char		   *query;		/* SQL command, or NULL if unknown */
+  struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -356,6 +376,8 @@ struct pg_conn
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	bool		in_batch;		/* connection is in batch (pipelined) mode */
+	bool		batch_aborted;	/* current batch is aborted, discarding until next Sync */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
@@ -363,6 +385,15 @@ struct pg_conn
 	PGnotify   *notifyHead;		/* oldest unreported Notify msg */
 	PGnotify   *notifyTail;		/* newest unreported Notify msg */
 
+	/* The command queue
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+	PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	/* See PQconnectPoll() for how we use 'int' and not 'pgsocket'. */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
diff --git a/src/test/examples/.gitignore b/src/test/examples/.gitignore
index 1957ec1..7a72420 100644
--- a/src/test/examples/.gitignore
+++ b/src/test/examples/.gitignore
@@ -4,3 +4,4 @@
 /testlibpq4
 /testlo
 /testlo64
+/testlibpqbatch
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index 31da210..92a6faf 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqbatch
 
 all: $(PROGS)
 
diff --git a/src/test/examples/testlibpqbatch.c b/src/test/examples/testlibpqbatch.c
new file mode 100644
index 0000000..b137d6f
--- /dev/null
+++ b/src/test/examples/testlibpqbatch.c
@@ -0,0 +1,1254 @@
+/*
+ * src/test/examples/testlibpqbatch.c
+ *
+ *
+ * testlibpqbatch.c
+ *		Test of batch execution funtionality
+ */
+
+#ifdef WIN32
+#include <windows.h>
+#endif
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include "libpq-fe.h"
+
+static void exit_nicely(PGconn *conn);
+static void simple_batch(PGconn *conn);
+static void test_disallowed_in_batch(PGconn *conn);
+static void batch_insert_pipelined(PGconn *conn, int n_rows);
+static void batch_insert_sequential(PGconn *conn, int n_rows);
+static void test_batch_abort(PGconn *conn);
+
+#ifndef VERBOSE
+#define VERBOSE 0
+#endif
+
+static const Oid INT4OID = 23;
+
+static const char *const drop_table_sql
+= "DROP TABLE IF EXISTS batch_demo";
+static const char *const create_table_sql
+= "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql
+= "INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+static void
+simple_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "simple batch... ");
+	fflush(stderr);
+
+	/*
+	 * Enter batch mode and dispatch a set of operations, which we'll then
+	 * process the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim
+	 * processing of results since our out buffer will give us enough slush to
+	 * work with and we won't block on sending. So blocking mode is fine.
+	 */
+	if (PQisnonblocking(conn))
+	{
+		fprintf(stderr, "Expected blocking connection mode\n");
+		goto fail;
+	}
+
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQendBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode with work in progress should fail, but succeeded\n");
+		goto fail;
+	}
+
+	if (!PQsendEndBatch(conn))
+	{
+		fprintf(stderr, "Ending a batch failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	/*
+	 * in batch mode we have to ask for the first result to be processed;
+	 * until we do PQgetResult will return null:
+	 */
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something in a batch before first PQgetNextQuery() call\n");
+		goto fail;
+	}
+
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	/* We can't PQgetNextQuery when there might still be pending results */
+	if (PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() should've failed with pending results: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s from first batch item\n",
+				PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something extra after first result before PQgetNextQuery() call\n");
+		goto fail;
+	}
+
+	/*
+	 * Even though we've processed the result there's still a sync to come and
+	 * we can't exit batch mode yet
+	 */
+	if (PQendBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode after query but before sync succeeded incorrectly\n");
+		goto fail;
+	}
+
+	/* should now get an explicit sync result */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at sync after first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+	{
+		fprintf(stderr, "Unexpected result code %s instead of sync result, error: %s\n",
+				PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+		goto fail;
+	}
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something extra after end batch call\n");
+		goto fail;
+	}
+
+	/* We're still in a batch... */
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Fell out of batch mode somehow\n");
+		goto fail;
+	}
+
+	/* until we end it, which we can safely do now */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode didn't seem to work\n");
+		goto fail;
+	}
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	fprintf(stderr, "test error cases... ");
+	fflush(stderr);
+
+	if (PQisnonblocking(conn))
+	{
+		fprintf(stderr, "Expected blocking connection mode: %u\n", __LINE__);
+		goto fail;
+	}
+
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "Unable to enter batch mode\n");
+		goto fail;
+	}
+
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Batch mode not activated properly\n");
+		goto fail;
+	}
+
+	/* PQexec should fail in batch mode */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+	{
+		fprintf(stderr, "PQexec should fail in batch mode but succeeded\n");
+		goto fail;
+	}
+
+	/* So should PQsendQuery */
+	if (PQsendQuery(conn, "SELECT 1") != 0)
+	{
+		fprintf(stderr, "PQsendQuery should fail in batch mode but succeeded\n");
+		goto fail;
+	}
+
+	/* Entering batch mode when already in batch mode is OK */
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "re-entering batch mode should be a no-op but failed\n");
+		goto fail;
+	}
+
+	if (PQisBusy(conn))
+	{
+		fprintf(stderr, "PQisBusy should return false when idle in batch, returned true\n");
+		goto fail;
+	}
+
+	/* ok, back to normal command mode */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "couldn't exit idle empty batch mode\n");
+		goto fail;
+	}
+
+	if (PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Batch mode not terminated properly\n");
+		goto fail;
+	}
+
+	/* exiting batch mode when not in batch mode should be a no-op */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "batch mode exit when not in batch mode should succeed but failed\n");
+		goto fail;
+	}
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "PQexec should succeed after exiting batch mode but failed with: %s\n",
+				PQerrorMessage(conn));
+		goto fail;
+	}
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+multi_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi batch... ");
+	fflush(stderr);
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first.
+	 */
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching first SELECT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendEndBatch(conn))
+	{
+		fprintf(stderr, "Ending first batch failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching second SELECT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendEndBatch(conn))
+	{
+		fprintf(stderr, "Ending second batch failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	/* OK, start processing the batch results */
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something in a batch before first PQgetNextQuery() call\n");
+		goto fail;
+	}
+
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s from first batch item\n",
+				PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something extra after first result before PQgetNextQuery() call\n");
+		goto fail;
+	}
+
+	if (PQendBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode after query but before sync succeeded incorrectly\n");
+		goto fail;
+	}
+
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at sync after first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+	{
+		fprintf(stderr, "Unexpected result code %s instead of sync result, error: %s (line %u)\n",
+		   PQresStatus(PQresultStatus(res)), PQerrorMessage(conn), __LINE__);
+		goto fail;
+	}
+
+	PQclear(res);
+	res = NULL;
+
+	/* second batch */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at second batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s from second batch item\n",
+				PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at second batch sync: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+	{
+		fprintf(stderr, "Unexpected result code %s from second end batch\n",
+				PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+
+	/* We're still in a batch... */
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Fell out of batch mode somehow\n");
+		goto fail;
+	}
+
+	/* until we end it, which we can safely do now */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode didn't seem to work\n");
+		goto fail;
+	}
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+/*
+ * When an operation in a batch fails the rest of the batch is flushed. We
+ * still have to get results for each batch item, but the item will just be
+ * a PGRES_BATCH_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the batch. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_batch_abort(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+	int			i;
+
+	fprintf(stderr, "aborted batch... ");
+	fflush(stderr);
+
+	res = PQexec(conn, drop_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQexec(conn, create_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first. Make sure the second operation in the first
+	 * batch ERRORs.
+	 */
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	dummy_params[0] = "1";
+	if (!PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching first INSERT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "SELECT no_such_function($1)", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching error select failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	dummy_params[0] = "2";
+	if (!PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching second insert failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendEndBatch(conn))
+	{
+		fprintf(stderr, "Ending first batch failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	dummy_params[0] = "3";
+	if (!PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching second-batch insert failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendEndBatch(conn))
+	{
+		fprintf(stderr, "Ending second batch failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	/*
+	 * OK, start processing the batch results.
+	 *
+	 * We should get a tuples-ok for the first query, a fatal error, a batch
+	 * aborted message for the second insert, a batch-end, then a command-ok
+	 * and a batch-ok for the second batch operation.
+	 */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (((res = PQgetResult(conn)) == NULL) || PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s from first batch item, error='%s'\n",
+				res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)),
+			 res == NULL ? PQerrorMessage(conn) : PQresultErrorMessage(res));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	/* second query, caused error */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at second batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (((res = PQgetResult(conn)) == NULL) || PQresultStatus(res) != PGRES_FATAL_ERROR)
+	{
+		fprintf(stderr, "Unexpected result code from second batch item. Wanted PGRES_FATAL_ERROR, got %s\n",
+				res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	/*
+	 * batch should now be aborted.
+	 *
+	 * Note that we could still queue more queries at this point if we wanted;
+	 * they'd get added to a new third batch since we've already sent a
+	 * second. The aborted flag relates only to the batch being received.
+	 */
+	if (!PQbatchIsAborted(conn))
+	{
+		fprintf(stderr, "batch should be flagged as aborted but isn't\n");
+		goto fail;
+	}
+
+	/* third query in batch, the second insert */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at third batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (((res = PQgetResult(conn)) == NULL) || PQresultStatus(res) != PGRES_BATCH_ABORTED)
+	{
+		fprintf(stderr, "Unexpected result code from third batch item. Wanted PGRES_BATCH_ABORTED, got %s\n",
+				res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	if (!PQbatchIsAborted(conn))
+	{
+		fprintf(stderr, "batch should be flagged as aborted but isn't\n");
+		goto fail;
+	}
+
+	/* We're still in a batch... */
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Fell out of batch mode somehow\n");
+		goto fail;
+	}
+
+	/* the batch sync */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at first batch sync: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	/*
+	 * The end of a failed batch is still a PGRES_BATCH_END so clients know to
+	 * start processing results normally again and can tell the difference
+	 * between skipped commands and the sync.
+	 */
+	if (((res = PQgetResult(conn)) == NULL) || PQresultStatus(res) != PGRES_BATCH_END)
+	{
+		fprintf(stderr, "Unexpected result code from first batch sync. Wanted PGRES_BATCH_END, got %s\n",
+				res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	if (PQbatchIsAborted(conn))
+	{
+		fprintf(stderr, "sync should've cleared the aborted flag but didn't\n");
+		goto fail;
+	}
+
+	/* We're still in a batch... */
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Fell out of batch mode somehow\n");
+		goto fail;
+	}
+
+	/* the insert from the second batch */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at first entry in second batch: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (((res = PQgetResult(conn)) == NULL) || PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s from first item in second batch\n",
+				res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	/* the second batch sync */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at second batch sync: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (((res = PQgetResult(conn)) == NULL) || PQresultStatus(res) != PGRES_BATCH_END)
+	{
+		fprintf(stderr, "Unexpected result code %s from second batch sync\n",
+				res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+	PQclear(res);
+	res = NULL;
+
+	/* We're still in a batch... */
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Fell out of batch mode somehow\n");
+		goto fail;
+	}
+
+	/* until we end it, which we can safely do now */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode didn't seem to work\n");
+		goto fail;
+	}
+
+	fprintf(stderr, "ok\n");
+
+	/*
+	 * Since we fired the batches off without a surrounding xact, the results
+	 * should be:
+	 *
+	 * - Implicit xact started by server around 1st batch - First insert
+	 * applied - Second statement aborted xact - Third insert skipped - Sync
+	 * rolled back first implicit xact - Implicit xact created by server
+	 * around 2nd batch - insert applied from 2nd batch - Sync commits 2nd
+	 * xact
+	 *
+	 * So we should only have the value 3 that we inserted.
+	 */
+	res = PQexec(conn, "SELECT itemno FROM batch_demo");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "Expected tuples, got %s: %s",
+				PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+		goto fail;
+	}
+
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		const char *val = PQgetvalue(res, i, 0);
+
+		if (strcmp(val, "3") != 0)
+		{
+			fprintf(stderr, "expected only insert with value 3, got %s", val);
+			goto fail;
+		}
+	}
+
+	if (PQntuples(res) != 1)
+	{
+		fprintf(stderr, "expected 1 result, got %d", PQntuples(res));
+		goto fail;
+	}
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+/* State machine enums for batch insert */
+typedef enum BatchInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+}	BatchInsertStep;
+
+static void
+batch_insert_pipelined(PGconn *conn, int n_rows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	BatchInsertStep send_step = BI_BEGIN_TX,
+				recv_step = BI_BEGIN_TX;
+	int			rows_to_send,
+				rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a batched insert into a table created at the start of the batch
+	 */
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "BEGIN",
+						   0, NULL, NULL, NULL, NULL, 0))
+	{
+		fprintf(stderr, "xact start failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+#if VERBOSE
+	fprintf(stdout, "sent BEGIN\n");
+#endif
+	send_step = BI_DROP_TABLE;
+
+	if (!PQsendQueryParams(conn, drop_table_sql,
+						   0, NULL, NULL, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+#if VERBOSE
+	fprintf(stdout, "sent DROP\n");
+#endif
+	send_step = BI_CREATE_TABLE;
+
+	if (!PQsendQueryParams(conn, create_table_sql,
+						   0, NULL, NULL, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+#if VERBOSE
+	fprintf(stdout, "sent CREATE\n");
+#endif
+	send_step = BI_PREPARE;
+
+	if (!PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids))
+	{
+		fprintf(stderr, "dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+#if VERBOSE
+	fprintf(stdout, "sent PREPARE\n");
+#endif
+	send_step = BI_INSERT_ROWS;
+
+	/*
+	 * Now we start inserting. We'll be sending enough data that we could fill
+	 * our out buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each
+	 * query are processed we should pop them to allow processing of the next
+	 * query. There's no need to finish the batch before processing results.
+	 */
+	if (PQsetnonblocking(conn, 1) != 0)
+	{
+		fprintf(stderr, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/*
+		 * Process any results, so we keep the server's out buffer free
+		 * flowing and it can continue to process input
+		 */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				const char *cmdtag;
+				const char *description = NULL;
+				int			status;
+				BatchInsertStep next_step;
+
+
+				res = PQgetResult(conn);
+
+				if (res == NULL)
+				{
+					/*
+					 * No more results from this query, advance to the next
+					 * result
+					 */
+					if (!PQgetNextQuery(conn))
+					{
+						fprintf(stderr, "Expected next query result but unable to dequeue: %s\n",
+								PQerrorMessage(conn));
+						goto fail;
+					}
+#if VERBOSE
+					fprintf(stdout, "next query!\n");
+#endif
+					continue;
+				}
+
+				status = PGRES_COMMAND_OK;
+				next_step = recv_step + 1;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive--;
+						if (rows_to_receive > 0)
+							next_step = BI_INSERT_ROWS;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_BATCH_END;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						abort();
+				}
+				if (description == NULL)
+					description = cmdtag;
+
+#if VERBOSE
+				fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n",
+						recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step);
+#endif
+
+				if (PQresultStatus(res) != status)
+				{
+					fprintf(stderr, "%s reported status %s, expected %s. Error msg is [%s]\n",
+							description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn));
+					goto fail;
+				}
+				if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+				{
+					fprintf(stderr, "%s expected command tag '%s', got '%s'\n",
+							description, cmdtag, PQcmdStatus(res));
+					goto fail;
+				}
+#if VERBOSE
+				fprintf(stdout, "Got %s OK\n", cmdtag);
+#endif
+				recv_step = next_step;
+
+				PQclear(res);
+				res = NULL;
+			}
+		}
+
+		/* Write more rows and/or the end batch message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+				insert_param_0[MAXINTLEN - 1] = '\0';
+
+				if (PQsendQueryPrepared(conn, "my_insert",
+										1, insert_params, NULL, NULL, 0))
+				{
+#if VERBOSE
+					fprintf(stdout, "sent row %d\n", rows_to_send);
+#endif
+					rows_to_send--;
+					if (rows_to_send == 0)
+						send_step = BI_COMMIT_TX;
+				}
+				else
+				{
+					/*
+					 * in nonblocking mode, so it's OK for an insert to fail
+					 * to send
+					 */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+							rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+									  0, NULL, NULL, NULL, NULL, 0))
+				{
+#if VERBOSE
+					fprintf(stdout, "sent COMMIT\n");
+#endif
+					send_step = BI_SYNC;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_SYNC)
+			{
+				if (PQsendEndBatch(conn))
+				{
+#if VERBOSE
+					fprintf(stdout, "Dispatched end batch message\n");
+#endif
+					send_step = BI_DONE;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+		}
+
+	}
+
+	/* We've got the sync message and the batch should be done */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQsetnonblocking(conn, 0) != 0)
+	{
+		fprintf(stderr, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+
+static void
+batch_insert_sequential(PGconn *conn, int nrows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+
+	insert_params[0] = &insert_param_0[0];
+
+	res = PQexec(conn, "BEGIN");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "BEGIN failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	res = PQexec(conn, drop_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	res = PQexec(conn, create_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "prepare failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	while (nrows > 0)
+	{
+		snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows);
+		insert_param_0[MAXINTLEN - 1] = '\0';
+
+		res = PQexecPrepared(conn, "my_insert2",
+							 1, insert_params, NULL, NULL, 0);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			fprintf(stderr, "INSERT failed: %s\n", PQerrorMessage(conn));
+			goto fail;
+		}
+		PQclear(res);
+		nrows--;
+	}
+
+	res = PQexec(conn, "COMMIT");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "COMMIT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+}
+
+static void
+usage_exit(const char *progname)
+{
+	fprintf(stderr, "Usage: %s ['connstring' [number_of_rows]]\n", progname);
+	exit(1);
+}
+
+
+int
+main(int argc, char **argv)
+{
+	const char *conninfo;
+	PGconn	   *conn;
+	struct timeval start_time,
+				end_time,
+				elapsed_time;
+	int			number_of_rows = 10000;
+
+	/*
+	 * If the user supplies a parameter on the command line, use it as the
+	 * conninfo string; otherwise default to setting dbname=postgres and using
+	 * environment variables or defaults for all other connection parameters.
+	 */
+	if (argc > 3)
+	{
+		usage_exit(argv[0]);
+	}
+	if (argc > 2)
+	{
+		errno = 0;
+		number_of_rows = strtol(argv[2], NULL, 10);
+		if (errno)
+		{
+			fprintf(stderr, "couldn't parse '%s' as an integer or zero rows supplied: %s", argv[2], strerror(errno));
+			usage_exit(argv[0]);
+		}
+		if (number_of_rows <= 0)
+		{
+			fprintf(stderr, "number_of_rows must be positive");
+			usage_exit(argv[0]);
+		}
+	}
+	if (argc > 1)
+	{
+		conninfo = argv[1];
+	}
+	else
+	{
+		conninfo = "dbname = postgres";
+	}
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+
+	/* Check to see that the backend connection was successfully made */
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+
+	test_disallowed_in_batch(conn);
+	simple_batch(conn);
+	multi_batch(conn);
+	test_batch_abort(conn);
+
+	fprintf(stderr, "inserting %d rows batched then unbatched\n", number_of_rows);
+
+	gettimeofday(&start_time, NULL);
+	batch_insert_pipelined(conn, number_of_rows);
+	gettimeofday(&end_time, NULL);
+	timersub(&end_time, &start_time, &elapsed_time);
+	printf("batch insert elapsed:      %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec);
+
+	gettimeofday(&start_time, NULL);
+	batch_insert_sequential(conn, number_of_rows);
+	gettimeofday(&end_time, NULL);
+	timersub(&end_time, &start_time, &elapsed_time);
+	printf("sequential insert elapsed: %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec);
+
+	fprintf(stderr, "Done.\n");
+
+
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+
+	return 0;
+}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to