On 2021-Mar-05, Alvaro Herrera wrote:

> I'll take the weekend to think about the issue with conn->last_query and
> conn->queryclass that I mentioned yesterday; other than that detail my
> feeling is that this is committable, so I'll be looking at getting this
> pushed early next weeks, barring opinions from others.

It took longer than I expected, but it works well now.  conn->last_query
is gone; all commands, both in pipeline mode and in no-pipeline mode, go
via the command queue.  This is cleaner all around; we don't have to
have the pipeline code "cheat" so that it looks like each command is
"last" at each point.

I have not absorbed David Johnston's latest doc suggestions yet.

I'm going to give the code a last renaming pass, on the idea that the
command queue is no longer exclusively for the pipeline mode, so some
things need less exclusionary names.  But functionality wise AFAICS this
patch has the shape it ought to have.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 910e9a81ea..0bffb92462 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3180,6 +3180,33 @@ ExecStatusType PQresultStatus(const PGresult *res);
            </para>
           </listitem>
          </varlistentry>
+
+         <varlistentry id="libpq-pgres-pipeline-sync">
+          <term><literal>PGRES_PIPELINE_SYNC</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a
+            synchronization point in pipeline mode, requested by 
+            <xref linkend="libpq-PQsendPipeline"/>.
+            This status occurs only when pipeline mode has been selected.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry id="libpq-pgres-pipeline-aborted">
+          <term><literal>PGRES_PIPELINE_ABORTED</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a pipeline that has
+            received an error from the server.  <function>PQgetResult</function>
+            must be called repeatedly, and each time it will return this status code
+            until the end of the current pipeline, at which point it will return
+            <literal>PGRES_PIPELINE_SYNC</literal> and normal processing can
+            resume.
+           </para>
+          </listitem>
+         </varlistentry>
+
         </variablelist>
 
         If the result status is <literal>PGRES_TUPLES_OK</literal> or
@@ -4926,6 +4953,473 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-pipeline-mode">
+  <title>Pipeline Mode</title>
+
+  <indexterm zone="libpq-pipeline-mode">
+   <primary>libpq</primary>
+   <secondary>pipeline mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-pipeline-mode">
+   <primary>pipelining</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-pipeline-mode">
+   <primary>batch mode</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> pipeline mode allows applications to
+   send a query without having to read the result of the previously
+   sent query.  Taking advantage of the pipeline mode, a client will wait
+   less for the server, since multiple queries/results can be
+   sent/received in a single network transaction.
+  </para>
+
+  <para>
+   While pipeline mode provides a significant performance boost, writing
+   clients using the pipeline mode is more complex because it involves
+   managing a queue of pending queries and finding which result
+   corresponds to which query in the queue.
+  </para>
+
+  <para>
+   Pipeline mode also generally consumes more memory on both the client and server,
+   though careful and aggressive management of the send/receive queue can mitigate
+   this.  This applies whether or not the connection is in blocking or non-blocking
+   mode.
+  </para>
+
+  <sect2 id="libpq-pipeline-using">
+   <title>Using Pipeline Mode</title>
+
+   <para>
+    To issue pipelines, the application must switch the connection
+    into pipeline mode,
+    which is done with <xref linkend="libpq-PQenterPipelineMode"/>.
+    <xref linkend="libpq-PQpipelineStatus"/> can be used
+    to test whether pipeline mode is active.
+    In pipeline mode, only <link linkend="libpq-async">asynchronous operations</link>
+    are permitted, and <literal>COPY</literal> is disallowed.
+    Using synchronous command execution functions
+    such as <function>PQfn</function>,
+    <function>PQexec</function>,
+    <function>PQexecParams</function>,
+    <function>PQprepare</function>,
+    <function>PQexecPrepared</function>,
+    <function>PQdescribePrepared</function>,
+    <function>PQdescribePortal</function>,
+    is an error condition.
+    Once all dispatched commands have had their results processed, and
+    the end pipeline result has been consumed, the application may return
+    to non-pipelined mode with <xref linkend="libpq-PQexitPipelineMode"/>.
+   </para>
+
+   <note>
+    <para>
+     It is best to use pipeline mode with <application>libpq</application> in
+     <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
+     in blocking mode it is possible for a client/server deadlock to occur.
+      <footnote>
+       <para>
+        The client will block trying to send queries to the server, but the
+        server will block trying to send results to the client from queries
+        it has already processed. This only occurs when the client sends
+        enough queries to fill both its output buffer and the server's receive
+        buffer before it switches to processing input from the server,
+        but it's hard to predict exactly when that will happen.
+       </para>
+      </footnote>
+    </para>
+   </note>
+
+   <sect3 id="libpq-pipeline-sending">
+    <title>Issuing Queries</title>
+
+    <para>
+     After entering pipeline mode, the application dispatches requests using
+     <xref linkend="libpq-PQsendQuery"/>, 
+     <xref linkend="libpq-PQsendQueryParams"/>, 
+     or its prepared-query sibling
+     <xref linkend="libpq-PQsendQueryPrepared"/>.
+     These requests are queued on the client-side until flushed to the server;
+     this occurs when <xref linkend="libpq-PQsendPipeline"/> is used to
+     establish a synchronization point in the pipeline,
+     or when <xref linkend="libpq-PQflush"/> is called.
+     The functions <xref linkend="libpq-PQsendPrepare"/>,
+     <xref linkend="libpq-PQsendDescribePrepared"/>, and
+     <xref linkend="libpq-PQsendDescribePortal"/> also work in pipeline mode.
+     Result processing is described below.
+    </para>
+
+    <para>
+     The server executes statements, and returns results, in the order the
+     client sends them.  The server will begin executing the commands in the
+     pipeline immediately, not waiting for the end of the pipeline.
+     If any statement encounters an error, the server aborts the current
+     transaction and skips processing commands in the pipeline until the
+     next synchronization point established by <function>PQsendPipeline</function>.
+     (This remains true even if the commands in the pipeline would rollback
+     the transaction.)
+     Query processing resumes after the synchronization point.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one; for example, one query may define a table that the next
+     query in the same pipeline uses. Similarly, an application may
+     create a named prepared statement and execute it with later
+     statements in the same pipeline.
+    </para>
+   </sect3>
+
+   <sect3 id="libpq-pipeline-results">
+    <title>Processing Results</title>
+
+    <para>
+     To process the result of one query in a pipeline, the application calls
+     <function>PQgetResult</function> repeatedly and handles each result
+     until <function>PQgetResult</function> returns null.
+     The result from the next query in the pipeline may then be retrieved using
+     <function>PQgetResult</function> again and the cycle repeated.
+     The application handles individual statement results as normal.
+     When the results of all the queries in the pipeline have been
+     returned, <function>PQgetResult</function> returns a result
+     containing the status value <literal>PGRES_PIPELINE_SYNC</literal>
+    </para>
+
+    <para>
+     The client may choose to defer result processing until the complete
+     pipeline has been sent, or interleave that with sending further
+     queries in the pipeline; see <xref linkend="libpq-pipeline-interleave"/>.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     This mode selection is effective only for the query currently
+     being processed. For more information on the use of
+     <function>PQsetSingleRowMode</function>,
+     refer to <xref linkend="libpq-single-row-mode"/>.
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal
+     asynchronous processing except that it may contain the new
+     <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
+     and <literal>PGRES_PIPELINE_ABORTED</literal>.
+     <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
+     <function>PQsendPipeline</function> after retrieving results for all
+     queries in the pipeline.
+     <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
+     stream result for the first error and all subsequent results
+     until <literal>PGRES_PIPELINE_SYNC</literal>;
+     see <xref linkend="libpq-pipeline-errors"/>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing pipeline results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed (except that
+     <function>PQgetResult</function> returns null to indicate that we start
+     returning the results of next query). The application must keep track
+     of the order in which it sent queries, to associate them with their
+     corresponding results.
+     Applications will typically use a state machine or a FIFO queue for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-pipeline-errors">
+    <title>Error Handling</title>
+
+    <para>
+     From the client perspective, after <function>PQresultStatus</function>
+     returns <literal>PGRES_FATAL_ERROR</literal>,
+     the pipeline is flagged as aborted.
+     <function>PQresultStatus</function> will report a
+     <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
+     operation in an aborted pipeline. The result for
+     <function>PQsendPipeline</function> is reported as
+     <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the pipeline used an implicit transaction, then operations that have
+     already executed are rolled back and operations that were queued to follow
+     the failed operation are skipped entirely. The same behaviour holds if the
+     pipeline starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the pipeline. If a pipeline contains
+     <emphasis>multiple explicit transactions</emphasis>, all transactions that
+     committed prior to the error remain committed, the currently in-progress
+     transaction is aborted, and all subsequent operations are skipped completely,
+     including subsequent transactions.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal> &mdash; only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously, the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+   </sect3>
+
+   <sect3 id="libpq-pipeline-interleave">
+    <title>Interleaving Result Processing and Query Dispatch</title>
+
+    <para>
+     To avoid deadlocks on large pipelines the client should be structured
+     around a non-blocking event loop using operating system facilities
+     such as <function>select</function>, <function>poll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work
+     remaining to be dispatched and a queue of work that has been dispatched
+     but not yet had its results processed. When the socket is writable
+     it should dispatch more work. When the socket is readable it should
+     read results and process them, matching them up to the next entry in
+     its expected results queue.  Based on available memory, results from the
+     socket should be read frequently: there's no need to wait until the
+     pipeline end to read the results.  Pipelines should be scoped to logical
+     units of work, usually (but not necessarily) one transaction per pipeline.
+     There's no need to exit pipeline mode and re-enter it between pipelines,
+     or to wait for one pipeline to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state
+     machine to track sent and received work is in
+     <filename>src/test/modules/libpq_pipeline/libpq_pipeline.c</filename>
+     in the PostgreSQL source distribution.
+    </para>
+   </sect3>
+  </sect2>
+
+  <sect2 id="libpq-pipeline-functions">
+   <title>Functions Associated with Pipeline Mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQpipelineStatus">
+     <term><function>PQpipelineStatus</function><indexterm><primary>PQpipelineStatus</primary></indexterm></term>
+
+     <listitem>
+      <para>
+      Returns the current pipeline mode status of the
+      <application>libpq</application> connection.
+<synopsis>
+PGpipelineStatus PQpipelineStatus(const PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <function>PQpipelineStatus</function> can return one of the following values:
+
+       <variablelist>
+        <varlistentry>
+         <term>
+          <literal>PQ_PIPELINE_ON</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in
+           pipeline mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQ_PIPELINE_OFF</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is
+           <emphasis>not</emphasis> in pipeline mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQ_PIPELINE_ABORTED</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in pipeline
+           mode and an error occurred while processing the current pipeline.
+           The aborted flag is cleared when <function>PQresultStatus</function>
+           returns PGRES_PIPELINE_SYNC at the end of the pipeline.
+           Clients don't usually need this function to
+           verify aborted status, as they can tell that the pipeline is aborted
+           from the <literal>PGRES_PIPELINE_ABORTED</literal> result code.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+       </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterPipelineMode">
+     <term><function>PQenterPipelineMode</function><indexterm><primary>PQenterPipelineMode</primary></indexterm></term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter pipeline mode if it is currently idle or
+      already in pipeline mode.
+
+<synopsis>
+int PQenterPipelineMode(PGconn *conn);
+</synopsis>
+
+      </para>
+      <para>
+       Returns 1 for success.
+       Returns 0 and has no effect if the connection is not currently
+       idle, i.e., it has a result ready, or it is waiting for more
+       input from the server, etc.
+       This function does not actually send anything to the server,
+       it just changes the <application>libpq</application> connection
+       state.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitPipelineMode">
+     <term><function>PQexitPipelineMode</function><indexterm><primary>PQexitPipelineMode</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Causes a connection to exit pipeline mode if it is currently in pipeline mode
+       with an empty queue and no pending results.
+<synopsis>
+int PQexitPipelineMode(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success.  Returns 1 and takes no action if not in
+       pipeline mode. If the current statement isn't finished processing 
+       or there are results pending for collection with
+       <function>PQgetResult</function>, returns 0 and does nothing.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQsendPipeline">
+     <term><function>PQsendPipeline</function><indexterm><primary>PQsendPipeline</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Marks a synchronization point in a pipeline by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       and flushing the send buffer. This serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <xref linkend="libpq-pipeline-errors"/>.
+
+<synopsis>
+int PQsendPipeline(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       pipeline mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       failed.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2 id="libpq-pipeline-tips">
+   <title>When to Use Pipeline Mode</title>
+
+   <para>
+    Much like asynchronous query mode, there is no meaningful performance
+    overhead when using pipeline mode. It increases client application complexity,
+    and extra caution is required to prevent client/server deadlocks, but
+    pipeline mode can offer considerable performance improvements, in exchange for
+    increased memory usage from leaving state around longer.
+   </para>
+
+   <para>
+    Pipeline mode is most useful when the server is distant, i.e., network latency
+    (<quote>ping time</quote>) is high, and also when many small operations
+    are being performed in rapid succession.  There is usually less benefit
+    in using pipelined commands when each query takes many multiples of the client/server
+    round-trip time to execute.  A 100-statement operation run on a server
+    300ms round-trip-time away would take 30 seconds in network latency alone
+    without pipelining; with pipelining it may spend as little as 0.3s waiting for
+    results from the server.
+   </para>
+
+   <para>
+    Use pipelined commands when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed
+    into operations on sets, or into a <literal>COPY</literal> operation.
+   </para>
+
+   <para>
+    Pipeline mode is not useful when information from one operation is required by
+    the client to produce the next operation. In such cases, the client
+    would have to introduce a synchronization point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+BEGIN;
+SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+-- result: x=2
+-- client adds 1 to x:
+UPDATE mytable SET x = 3 WHERE id = 42;
+COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <para>
+    Pipelining is less useful, and more complex, when a single pipeline contains
+    multiple transactions (see <xref linkend="libpq-pipeline-errors"/>).
+   </para>
+
+   <note>
+    <para>
+     The pipeline API was introduced in <productname>PostgreSQL</productname> 14.
+     Pipeline mode is a client-side feature which doesn't require server
+     support, and works on any server that supports the v3 extended query
+     protocol.
+    </para>
+   </note>
+  </sect2>
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-by-Row</title>
 
@@ -4966,6 +5460,13 @@ int PQflush(PGconn *conn);
    Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
   </para>
 
+  <para>
+   When using pipeline mode, single-row mode needs to be activated for each
+   query in the pipeline before retrieving results for that query
+   with <function>PQgetResult</function>.
+   See <xref linkend="libpq-pipeline-mode"/> for more information.
+  </para>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-PQsetSingleRowMode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6d46da42e2..012e44c736 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while a libpq connection is in pipeline mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 299d93b241..5dd1e9e936 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1110,6 +1110,12 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
       row, the last value is kept.
      </para>
 
+     <para>
+      <literal>\gset</literal> and <literal>\aset</literal> cannot be used
+      pipeline mode, since query results are not immediately
+      fetched in this mode.
+     </para>
+
      <para>
       The following example puts the final account balance from the first query
       into variable <replaceable>abalance</replaceable>, and fills variables
@@ -1270,6 +1276,21 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
 </programlisting></para>
     </listitem>
    </varlistentry>
+
+   <varlistentry id='pgbench-metacommand-pipeline'>
+    <term><literal>\startpipeline</literal></term>
+    <term><literal>\endpipeline</literal></term>
+
+    <listitem>
+      <para>
+        These commands delimit the start and end of a pipeline of SQL statements.
+        In a pipeline, statements are sent to server without waiting for the results
+        of previous statements (see <xref linkend="libpq-pipeline-mode"/>).
+        Pipeline mode requires the extended query protocol.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect2>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5272eed9ab..f74378110a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 			walres->err = _("empty query");
 			break;
 
+		case PGRES_PIPELINE_SYNC:
+		case PGRES_PIPELINE_ABORTED:
+			walres->status = WALRCV_ERROR;
+			walres->err = _("unexpected pipeline mode");
+			break;
+
 		case PGRES_NONFATAL_ERROR:
 		case PGRES_FATAL_ERROR:
 		case PGRES_BAD_RESPONSE:
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index f6a214669c..b7400708fc 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -395,10 +395,11 @@ typedef enum
 	 *
 	 * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
 	 * command, the command is sent to the server, and we move to
-	 * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
-	 * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
-	 * meta-commands are executed immediately.  If the command about to start
-	 * is actually beyond the end of the script, advance to CSTATE_END_TX.
+	 * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
+	 * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
+	 * wait for it to expire. Other meta-commands are executed immediately. If
+	 * the command about to start is actually beyond the end of the script,
+	 * advance to CSTATE_END_TX.
 	 *
 	 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
 	 * for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
 	META_IF,					/* \if */
 	META_ELIF,					/* \elif */
 	META_ELSE,					/* \else */
-	META_ENDIF					/* \endif */
+	META_ENDIF,					/* \endif */
+	META_STARTPIPELINE,			/* \startpipeline */
+	META_ENDPIPELINE			/* \endpipeline */
 } MetaCommand;
 
 typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
 		mc = META_GSET;
 	else if (pg_strcasecmp(cmd, "aset") == 0)
 		mc = META_ASET;
+	else if (pg_strcasecmp(cmd, "startpipeline") == 0)
+		mc = META_STARTPIPELINE;
+	else if (pg_strcasecmp(cmd, "endpipeline") == 0)
+		mc = META_ENDPIPELINE;
 	else
 		mc = META_NONE;
 	return mc;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
 				if (commands[j]->type != SQL_COMMAND)
 					continue;
 				preparedStatementName(name, st->use_file, j);
-				res = PQprepare(st->con, name,
-								commands[j]->argv[0], commands[j]->argc - 1, NULL);
-				if (PQresultStatus(res) != PGRES_COMMAND_OK)
-					pg_log_error("%s", PQerrorMessage(st->con));
-				PQclear(res);
+				if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+				{
+					res = PQprepare(st->con, name,
+									commands[j]->argv[0], commands[j]->argc - 1, NULL);
+					if (PQresultStatus(res) != PGRES_COMMAND_OK)
+						pg_log_error("%s", PQerrorMessage(st->con));
+					PQclear(res);
+				}
+				else
+				{
+					/*
+					 * In pipeline mode, we use asynchronous functions. If a
+					 * server-side error occurs, it will be processed later
+					 * among the other results.
+					 */
+					if (!PQsendPrepare(st->con, name,
+									   commands[j]->argv[0], commands[j]->argc - 1, NULL))
+						pg_log_error("%s", PQerrorMessage(st->con));
+				}
 			}
 			st->prepared[st->use_file] = true;
 		}
@@ -2805,8 +2826,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
 	 * varprefix should be set only with \gset or \aset, and SQL commands do
 	 * not need it.
 	 */
+#if 0
 	Assert((meta == META_NONE && varprefix == NULL) ||
 		   ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
+#endif
 
 	res = PQgetResult(st->con);
 
@@ -2874,6 +2897,12 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
 				/* otherwise the result is simply thrown away by PQclear below */
 				break;
 
+			case PGRES_PIPELINE_SYNC:
+				pg_log_debug("client %d pipeline ending", st->id);
+				if (PQexitPipelineMode(st->con) != 1)
+					pg_log_error("client %d failed to exit pipeline mode", st->id);
+				break;
+
 			default:
 				/* anything else is unexpected */
 				pg_log_error("client %d script %d aborted in command %d query %d: %s",
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				/* Execute the command */
 				if (command->type == SQL_COMMAND)
 				{
+					/* disallow \aset and \gset in pipeline mode */
+					if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+					{
+						if (command->meta == META_GSET)
+						{
+							commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
+							st->state = CSTATE_ABORTED;
+							break;
+						}
+						else if (command->meta == META_ASET)
+						{
+							commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
+							st->state = CSTATE_ABORTED;
+							break;
+						}
+					}
+
 					if (!sendCommand(st, command))
 					{
 						commandFailed(st, "SQL", "SQL command send failed");
 						st->state = CSTATE_ABORTED;
 					}
 					else
-						st->state = CSTATE_WAIT_RESULT;
+					{
+						/* Wait for results, unless in pipeline mode */
+						if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+							st->state = CSTATE_WAIT_RESULT;
+						else
+							st->state = CSTATE_END_COMMAND;
+					}
 				}
 				else if (command->type == META_COMMAND)
 				{
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				if (readCommandResponse(st,
 										sql_script[st->use_file].commands[st->command]->meta,
 										sql_script[st->use_file].commands[st->command]->varprefix))
-					st->state = CSTATE_END_COMMAND;
+				{
+					/*
+					 * outside of pipeline mode: stop reading results.
+					 * pipeline mode: continue reading results until an
+					 * end-of-pipeline response.
+					 */
+					if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+						st->state = CSTATE_END_COMMAND;
+				}
 				else
 					st->state = CSTATE_ABORTED;
 				break;
@@ -3516,6 +3576,49 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 			return CSTATE_ABORTED;
 		}
 	}
+	else if (command->meta == META_STARTPIPELINE)
+	{
+		/*
+		 * In pipeline mode, we use a workflow based on libpq pipeline
+		 * functions.
+		 */
+		if (querymode == QUERY_SIMPLE)
+		{
+			commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
+			return CSTATE_ABORTED;
+		}
+
+		if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+		{
+			commandFailed(st, "startpipeline", "already in pipeline mode");
+			return CSTATE_ABORTED;
+		}
+		if (PQenterPipelineMode(st->con) == 0)
+		{
+			commandFailed(st, "startpipeline", "failed to enter pipeline mode");
+			return CSTATE_ABORTED;
+		}
+	}
+	else if (command->meta == META_ENDPIPELINE)
+	{
+		if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+		{
+			commandFailed(st, "endpipeline", "not in pipeline mode");
+			return CSTATE_ABORTED;
+		}
+		if (!PQsendPipeline(st->con))
+		{
+			commandFailed(st, "endpipeline", "failed to send the pipeline");
+			return CSTATE_ABORTED;
+		}
+		if (!PQexitPipelineMode(st->con))
+		{
+			commandFailed(st, "endpipeline", "failed to exit pipeline mode");
+			return CSTATE_ABORTED;
+		}
+		/* collect pending results before getting out of pipeline mode */
+		return CSTATE_WAIT_RESULT;
+	}
 
 	/*
 	 * executing the expression or shell command might have taken a
@@ -4725,7 +4828,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
 			syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
 						 "missing command", NULL, -1);
 	}
-	else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
+	else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
+			 my_command->meta == META_STARTPIPELINE ||
+			 my_command->meta == META_ENDPIPELINE)
 	{
 		if (my_command->argc != 1)
 			syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..60d09e6d63 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQenterPipelineMode       180
+PQexitPipelineMode        181
+PQsendPipeline            182
+PQpipelineStatus          183
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 4e21057d0f..a714e9fc53 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
 	}
 }
 
+/*
+ * pqFreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+pqFreeCommandQueue(PGcommandQueueEntry *queue)
+{
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *cur = queue;
+
+		queue = cur->next;
+		if (cur->query)
+			free(cur->query);
+		free(cur);
+	}
+}
 
 /*
  *		pqDropServerData
@@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
 
+	pqFreeCommandQueue(conn->cmd_queue_head);
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+	pqFreeCommandQueue(conn->cmd_queue_recycle);
+	conn->cmd_queue_recycle = NULL;
+
 	/* Reset ParameterStatus data, as well as variables deduced from it */
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
@@ -2459,6 +2482,7 @@ keep_going:						/* We will come back to here until there is
 		/* Drop any PGresult we might have, too */
 		conn->asyncStatus = PGASYNC_IDLE;
 		conn->xactStatus = PQTRANS_IDLE;
+		conn->pipelineStatus = PQ_PIPELINE_OFF;
 		pqClearAsyncResult(conn);
 
 		/* Reset conn->status to put the state machine in the right state */
@@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
 
 	conn->status = CONNECTION_BAD;
 	conn->asyncStatus = PGASYNC_IDLE;
+	conn->pipelineStatus = PQ_PIPELINE_OFF;
 	conn->xactStatus = PQTRANS_IDLE;
 	conn->options_valid = false;
 	conn->nonblocking = false;
@@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
 	if (conn->connip)
 		free(conn->connip);
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->write_err_msg)
 		free(conn->write_err_msg);
 	if (conn->inBuffer)
@@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
 	conn->status = CONNECTION_BAD;	/* Well, not really _bad_ - just absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	conn->xactStatus = PQTRANS_IDLE;
+	conn->pipelineStatus = PQ_PIPELINE_OFF;
 	pqClearAsyncResult(conn);	/* deallocate result */
 	resetPQExpBuffer(&conn->errorMessage);
 	release_conn_addrinfo(conn);
@@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
 	return conn->be_pid;
 }
 
+PGpipelineStatus
+PQpipelineStatus(const PGconn *conn)
+{
+	if (!conn)
+		return PQ_PIPELINE_OFF;
+
+	return conn->pipelineStatus;
+}
+
 int
 PQconnectionNeedsPassword(const PGconn *conn)
 {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 9a038043b2..0e964979bc 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_PIPELINE_SYNC",
+	"PGRES_PIPELINE_ABORTED"
 };
 
 /*
@@ -71,6 +73,12 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int	PQsendDescribe(PGconn *conn, char desc_type,
 						   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry *pqMakePipelineCmd(PGconn *conn);
+static void pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static void pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+void pqCommandQueueAdvance(PGconn *conn);
+static void pqPipelineProcessQueue(PGconn *conn);
+static int	pqPipelineFlush(PGconn *conn);
 
 
 /* ----------------
@@ -1171,7 +1179,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1209,9 +1217,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
 static int
 PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 {
+	PGcommandQueueEntry *entry = NULL;
+
 	if (!PQsendQueryStart(conn, newQuery))
 		return 0;
 
+	entry = pqMakePipelineCmd(conn);
+	if (entry == NULL)
+		return 0;			/* error msg already set */
+
 	/* check the argument */
 	if (!query)
 	{
@@ -1220,37 +1234,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 		return 0;
 	}
 
-	/* construct the outgoing Query message */
-	if (pqPutMsgStart('Q', conn) < 0 ||
-		pqPuts(query, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
+	/* Send the query message(s) */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
-		/* error message should be set up already */
-		return 0;
+		/* construct the outgoing Query message */
+		if (pqPutMsgStart('Q', conn) < 0 ||
+			pqPuts(query, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+		{
+			/* error message should be set up already */
+			return 0;
+		}
+
+		/* remember we are using simple query protocol */
+		entry->queryclass = PGQUERY_SIMPLE;
+		/* and remember the query text too, if possible */
+		entry->query = strdup(query);
 	}
+	else
+	{
+		/*
+		 * In pipeline mode, we cannot use the simple protocol, so we send
+		 * Parse, Bind, Describe Portal, Execute.
+		 */
+		if (pqPutMsgStart('P', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPuts(query, conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+		if (pqPutMsgStart('B', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+		if (pqPutMsgStart('D', conn) < 0 ||
+			pqPutc('P', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+		if (pqPutMsgStart('E', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPutInt(0, 4, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
 
-	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
-
-	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+		entry->queryclass = PGQUERY_EXTENDED;
+		entry->query = strdup(query);
+	}
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
-	{
-		/* error message should be set up already */
-		return 0;
-	}
+	if (pqPipelineFlush(conn) < 0)
+		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	pqAppendPipelineCmd(conn, entry);
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
+
+sendFailed:
+	pqRecyclePipelineCmd(conn, entry);
+	/* error message should be set up already */
+	return 0;
 }
 
 /*
@@ -1307,6 +1359,8 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *entry = NULL;
+
 	if (!PQsendQueryStart(conn, true))
 		return 0;
 
@@ -1330,6 +1384,10 @@ PQsendPrepare(PGconn *conn,
 		return 0;
 	}
 
+	entry = pqMakePipelineCmd(conn);
+	if (entry == NULL)
+		return 0;			/* error msg already set */
+
 	/* construct the Parse message */
 	if (pqPutMsgStart('P', conn) < 0 ||
 		pqPuts(stmtName, conn) < 0 ||
@@ -1356,32 +1414,42 @@ PQsendPrepare(PGconn *conn,
 	if (pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	/*
+	 * In non-pipeline mode, add a Sync and prepare to send.  In pipeline mode
+	 * we just keep track of the new message.
+	 */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	entry->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, query just winds up NULL */
+	entry->query = strdup(query);
+
+	pqAppendPipelineCmd(conn, entry);
+
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
-	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecyclePipelineCmd(conn, entry);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1429,7 +1497,8 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ *	Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn, bool newQuery)
@@ -1450,20 +1519,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 							 libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE &&
+		conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
 		appendPQExpBufferStr(&conn->errorMessage,
 							 libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	pqClearAsyncResult(conn);
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		/*
+		 * When enqueuing commands we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when pqPipelineProcessQueue()
+		 * advances to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_IDLE:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				appendPQExpBufferStr(&conn->errorMessage,
+									 libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+		}
+	}
+	else
+	{
+		/*
+		 * This command's results will come in immediately. Initialize async
+		 * result-accumulation state
+		 */
+		pqClearAsyncResult(conn);
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+		/* reset single-row processing mode */
+		conn->singleRowMode = false;
 
+	}
 	/* ready to send command message */
 	return true;
 }
@@ -1487,10 +1593,16 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *entry;
+
+	entry = pqMakePipelineCmd(conn);
+	if (entry == NULL)
+		return 0;			/* error msg already set */
 
 	/*
-	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
-	 * using specified statement name and the unnamed portal.
+	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
+	 * (if not in pipeline mode), using specified statement name and the
+	 * unnamed portal.
 	 */
 
 	if (command)
@@ -1600,35 +1712,38 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	/* construct the Sync message if not in pipeline mode */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		if (pqPutMsgStart('S', conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	entry->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, query just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		entry->query = strdup(command);
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	pqAppendPipelineCmd(conn, entry);
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecyclePipelineCmd(conn, entry);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1647,8 +1762,9 @@ PQsetSingleRowMode(PGconn *conn)
 		return 0;
 	if (conn->asyncStatus != PGASYNC_BUSY)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
+	if (!conn->cmd_queue_head ||
+		(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
+		 conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
 		return 0;
 	if (conn->result)
 		return 0;
@@ -1726,14 +1842,17 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
 }
 
-
 /*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
  *	  memory).
+ *
+ *	  In pipeline mode, once all the result of a query have been returned,
+ *	  PQgetResult returns NULL to let the user know that the next
+ *	  query is being processed.  At the end of the pipeline, returns a
+ *	  result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1803,8 +1922,60 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
+			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+			{
+				/*
+				 * We're about to return the NULL that terminates the round of
+				 * results from the current query; prepare to send the results
+				 * of the next query when we're called next.  Also, since this
+				 * is the start of the results of the next query, clear any
+				 * prior error message.
+				 */
+				resetPQExpBuffer(&conn->errorMessage);
+				pqPipelineProcessQueue(conn);
+			}
 			break;
 		case PGASYNC_READY:
+			/*
+			 * For any query type other than simple query protocol, we advance
+			 * the command queue here.  For simple query protocol, we can get
+			 * the READY state multiple times before the command is actually
+			 * complete, since the string can contain many queries -- so we
+			 * wait till we receive ReadyForQuery.
+			 */
+			if (conn->cmd_queue_head &&
+				conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
+				pqCommandQueueAdvance(conn);
+			res = pqPrepareAsyncResult(conn);
+			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+			{
+				/*
+				 * We're about to send the results of the current query.  Set
+				 * us idle now, and ...
+				 */
+				conn->asyncStatus = PGASYNC_IDLE;
+
+				/*
+				 * ... in cases when we're sending a pipeline-sync result,
+				 * move queue processing forwards immediately, so that next
+				 * time we're called, we're prepared to return the next result
+				 * received from the server.  In all other cases, leave the
+				 * queue state change for next time, so that a terminating NULL
+				 * result is sent.
+				 *
+				 * In other words: we don't return a NULL after a pipeline
+				 * sync.
+				 */
+				if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
+					pqPipelineProcessQueue(conn);
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
 			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
@@ -1985,6 +2156,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
+		return false;
+	}
+
 	/*
 	 * Since this is the beginning of a query cycle, reset the error buffer.
 	 */
@@ -2148,6 +2326,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *entry = NULL;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2155,6 +2335,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 	if (!PQsendQueryStart(conn, true))
 		return 0;
 
+	entry = pqMakePipelineCmd(conn);
+	if (entry == NULL)
+		return 0;			/* error msg already set */
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2163,32 +2347,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
-
-	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last_query string (not relevant now) */
-	if (conn->last_query)
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
-		free(conn->last_query);
-		conn->last_query = NULL;
+		if (pqPutMsgStart('S', conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
 	}
 
+	/* remember we are doing a Describe */
+	entry->queryclass = PGQUERY_DESCRIBE;
+
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	pqAppendPipelineCmd(conn, entry);
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecyclePipelineCmd(conn, entry);
 	/* error message should be set up already */
 	return 0;
 }
@@ -2327,7 +2511,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 	 * If we sent the COPY command in extended-query mode, we must issue a
 	 * Sync as well.
 	 */
-	if (conn->queryclass != PGQUERY_SIMPLE)
+	if (conn->cmd_queue_head &&
+		conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
 	{
 		if (pqPutMsgStart('S', conn) < 0 ||
 			pqPutMsgEnd(conn) < 0)
@@ -2541,6 +2726,13 @@ PQfn(PGconn *conn,
 	 */
 	resetPQExpBuffer(&conn->errorMessage);
 
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("PQfn not allowed in pipeline mode\n"));
+		return NULL;
+	}
+
 	if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
 		conn->result != NULL)
 	{
@@ -2555,6 +2747,365 @@ PQfn(PGconn *conn,
 						   args, nargs);
 }
 
+/* ====== Pipeline mode support ======== */
+
+/*
+ * PQenterPipelineMode
+ *		Put an idle connection in pipeline mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQsendPipeline. Multiple pipelines
+ * can be sent while in pipeline mode.  Pipeline mode can be exited
+ * by calling PQexitPipelineMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterPipelineMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	/* succeed with no action if already in pipeline mode */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+		return 1;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
+		return 0;
+	}
+
+	conn->pipelineStatus = PQ_PIPELINE_ON;
+
+	return 1;
+}
+
+/*
+ * PQexitPipelineMode
+ *		End pipeline mode and return to normal command mode.
+ *
+ * Returns 1 in success (pipeline mode successfully ended, or not in pipeline
+ * mode).
+ *
+ * Returns 0 if in pipeline mode and cannot be ended yet.  Error message will
+ * be set.
+ */
+int
+PQexitPipelineMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		return 1;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+			/* there are some uncollected results */
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+			return 0;
+
+		case PGASYNC_BUSY:
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("cannot exit pipeline mode while busy\n"));
+			return 0;
+
+		default:
+			/* OK */
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+		return 0;
+	}
+
+	conn->pipelineStatus = PQ_PIPELINE_OFF;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	/* Flush any pending data in out buffer */
+	if (pqFlush(conn) < 0)
+		return 0;				/* error message is setup already */
+	return 1;
+}
+
+/*
+ * pqCommandQueueAdvance
+ *		Remove one query from the command queue, when we receive
+ *		all results from the server that pertain to it.
+ */
+void
+pqCommandQueueAdvance(PGconn *conn)
+{
+	PGcommandQueueEntry *prevquery;
+
+	if (conn->cmd_queue_head == NULL)
+		return;
+
+	/* delink from queue */
+	prevquery = conn->cmd_queue_head;
+	conn->cmd_queue_head = conn->cmd_queue_head->next;
+
+	/* and make it recyclable */
+	prevquery->next = NULL;
+	pqRecyclePipelineCmd(conn, prevquery);
+}
+
+/*
+ * pqPipelineProcessQueue: subroutine for PQgetResult
+ *		In pipeline mode, start processing the results of the next query in the queue.
+ */
+void
+pqPipelineProcessQueue(PGconn *conn)
+{
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return;
+		case PGASYNC_IDLE:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/*
+		 * XXX rewrite this comment.  In pipeline mode but nothing left on the
+		 * queue; caller can submit more work or PQexitPipelineMode() now.
+		 */
+		return;
+	}
+
+	/* In non-pipeline mode, we're done here */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		return;
+
+	/* Initialize async result-accumulation state */
+	pqClearAsyncResult(conn);
+
+	/*
+	 * Reset single-row processing mode.  (Client has to set it up for each
+	 * query, if desired.)
+	 */
+	conn->singleRowMode = false;
+
+	if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
+		conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted pipeline we don't get anything from the server for
+		 * each result; we're just discarding commands from the queue until we
+		 * get to the next sync from the server.
+		 *
+		 * The PGRES_PIPELINE_ABORTED results tell the client that its queries
+		 * got aborted.
+		 */
+		conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
+		if (!conn->result)
+		{
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("out of memory\n"));
+			pqSaveErrorResult(conn);
+			return;
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+	}
+}
+
+/*
+ * PQsendPipeline
+ *		Send a Sync message as part of a pipeline, and flush to server
+ *
+ * It's legal to start submitting more commands in the pipeline immediately,
+ * without waiting for the results of the current pipeline. There's no need to
+ * end pipeline mode and start it again.
+ *
+ * If a command in a pipeline fails, every subsequent command up to and including
+ * the result to the Sync message sent by PQsendPipeline gets set to
+ * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
+ * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ *
+ * Queries can already have been sent before PQsendPipeline is called, but
+ * PQsendPipeline need to be called before retrieving command results.
+ *
+ * The connection will remain in pipeline mode and unavailable for new
+ * synchronous command execution functions until all results from the pipeline
+ * are processed by the client.
+ */
+int
+PQsendPipeline(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return 0;
+
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
+		return 0;
+	}
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			/* should be unreachable */
+			appendPQExpBufferStr(&conn->errorMessage,
+								 "internal error: cannot send pipeline while in COPY\n");
+			return 0;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_IDLE:
+			/* OK to send sync */
+			break;
+	}
+
+	entry = pqMakePipelineCmd(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	pqAppendPipelineCmd(conn, entry);
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (PQflush(conn) < 0)
+		goto sendFailed;
+
+	/*
+	 * Call pqPipelineProcessQueue so the user can call start calling
+	 * PQgetResult.
+	 */
+	pqPipelineProcessQueue(conn);
+
+	return 1;
+
+sendFailed:
+	pqRecyclePipelineCmd(conn, entry);
+	/* error message should be set up already */
+	return 0;
+}
+
+/*
+ * pqMakePipelineCmd
+ *		Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated.  Caller is responsible for adding it to the
+ * command queue (pqAppendPipelineCmd) once the struct is filled in, or
+ * releasing the memory (pqRecyclePipelineCmd) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcommandQueueEntry *
+pqMakePipelineCmd(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/*
+ * pqAppendPipelineCmd
+ *		Append a caller-allocated command queue entry to the queue.
+ *
+ * The query itself must already have been put in the output buffer by the
+ * caller.
+ */
+static void
+pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	Assert(entry->next == NULL);
+
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+
+	conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecyclePipelineCmd
+ *		Push a command queue entry onto the freelist.
+ */
+static void
+pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (entry == NULL)
+		return;
+
+	/* recyclable entries should not have a follow-on command */
+	Assert(entry->next == NULL);
+
+	if (entry->query)
+	{
+		free(entry->query);
+		entry->query = NULL;
+	}
+
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
 
 /* ====== accessor funcs for PGresult ======== */
 
@@ -2569,7 +3120,7 @@ PQresultStatus(const PGresult *res)
 char *
 PQresStatus(ExecStatusType status)
 {
-	if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
+	if ((unsigned int) status >= lengthof(pgresStatus))
 		return libpq_gettext("invalid ExecStatusType code");
 	return pgresStatus[status];
 }
@@ -3152,6 +3703,23 @@ PQflush(PGconn *conn)
 	return pqFlush(conn);
 }
 
+/*
+ * pqPipelineFlush
+ *
+ * In pipeline mode, data will be flushed only when the out buffer reaches the
+ * threshold value.  In non-pipeline mode, it behaves as stock pqFlush.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqPipelineFlush(PGconn *conn)
+{
+	if ((conn->pipelineStatus == PQ_PIPELINE_OFF) ||
+		(conn->outCount >= OUTBUFFER_THRESHOLD))
+		return pqFlush(conn);
+	return 0;
+}
+
 
 /*
  *		PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 2ca8c057b9..38dc1e4efa 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -177,14 +177,24 @@ pqParseInput3(PGconn *conn)
 				if (getParameterStatus(conn))
 					return;
 			}
-			else
-			{
-				pqInternalNotice(&conn->noticeHooks,
-								 "message type 0x%02x arrived from server while idle",
-								 id);
-				/* Discard the unexpected message */
-				conn->inCursor += msgLength;
-			}
+
+			/*
+			 * We're notionally not-IDLE when in pipeline mode we have
+			 * completed processing the results of one query and are waiting
+			 * for the next one in the pipeline.  In this case, as above, just
+			 * wait.
+			 */
+			if (conn->asyncStatus == PGASYNC_IDLE &&
+				conn->pipelineStatus != PQ_PIPELINE_OFF &&
+				conn->cmd_queue_head != NULL)
+				return;
+
+			/* Any other case is unexpected and we summarily skip it */
+			pqInternalNotice(&conn->noticeHooks,
+							 "message type 0x%02x arrived from server while idle",
+							 id);
+			/* Discard the unexpected message */
+			conn->inCursor += msgLength;
 		}
 		else
 		{
@@ -217,10 +227,37 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new
+								 * query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+					{
+						conn->result = PQmakeEmptyPGresult(conn,
+														   PGRES_PIPELINE_SYNC);
+						if (!conn->result)
+						{
+							appendPQExpBufferStr(&conn->errorMessage,
+												 libpq_gettext("out of memory"));
+							pqSaveErrorResult(conn);
+						}
+						else
+						{
+							conn->pipelineStatus = PQ_PIPELINE_ON;
+							conn->asyncStatus = PGASYNC_READY;
+						}
+					}
+					else
+					{
+						/*
+						 * In simple query protocol, advance the command queue
+						 * (see PQgetResult).
+						 */
+						if (conn->cmd_queue_head &&
+							conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
+							pqCommandQueueAdvance(conn);
+						conn->asyncStatus = PGASYNC_IDLE;
+					}
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -238,7 +275,8 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -285,7 +323,8 @@ pqParseInput3(PGconn *conn)
 						conn->inCursor += msgLength;
 					}
 					else if (conn->result == NULL ||
-							 conn->queryclass == PGQUERY_DESCRIBE)
+							 (conn->cmd_queue_head &&
+							  conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -318,7 +357,8 @@ pqParseInput3(PGconn *conn)
 					 * instead of PGRES_TUPLES_OK.  Otherwise we can just
 					 * ignore this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -450,7 +490,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 					  id, msgLength);
 	/* build an error result holding the error message */
 	pqSaveErrorResult(conn);
-	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
+	conn->asyncStatus = PGASYNC_READY;	/* drop out of PQgetResult wait loop */
 	/* flush input data since we're giving up on processing it */
 	pqDropConnection(conn, true);
 	conn->status = CONNECTION_BAD;	/* No more connection to backend */
@@ -477,7 +517,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (!conn->cmd_queue_head ||
+		(conn->cmd_queue_head &&
+		 conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
 	{
 		if (conn->result)
 			result = conn->result;
@@ -584,7 +626,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if ((!conn->cmd_queue_head) ||
+		(conn->cmd_queue_head &&
+		conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -875,6 +919,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	/* If in pipeline mode, set error indicator for it */
+	if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
+		conn->pipelineStatus = PQ_PIPELINE_ABORTED;
+
 	/*
 	 * If this is an error message, pre-emptively clear any incomplete query
 	 * result we may have.  We'd just throw it away below anyway, and
@@ -931,8 +979,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	 * might need it for an error cursor display, which is only true if there
 	 * is a PG_DIAG_STATEMENT_POSITION field.
 	 */
-	if (have_position && conn->last_query && res)
-		res->errQuery = pqResultStrdup(res, conn->last_query);
+	if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
+		res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
 
 	/*
 	 * Now build the "overall" error message for PQresultErrorMessage.
@@ -1851,7 +1899,8 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->cmd_queue_head &&
+			conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
@@ -1931,6 +1980,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
 	int			avail;
 	int			i;
 
+	/* already validated by PQfn */
+	Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
+
 	/* PQfn already validated connection state */
 
 	if (pqPutMsgStart('F', conn) < 0 || /* function call msg */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index fa9b62a844..fb31a49fab 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -96,7 +96,10 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_PIPELINE_SYNC,		/* pipeline synchronization point */
+	PGRES_PIPELINE_ABORTED,		/* Command didn't run because of an abort
+								 * earlier in a pipeline */
 } ExecStatusType;
 
 typedef enum
@@ -136,6 +139,16 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/*
+ * PGpipelineStatus - Current status of pipeline mode
+ */
+typedef enum
+{
+	PQ_PIPELINE_OFF,
+	PQ_PIPELINE_ON,
+	PQ_PIPELINE_ABORTED
+} PGpipelineStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -327,6 +340,7 @@ extern int	PQserverVersion(const PGconn *conn);
 extern char *PQerrorMessage(const PGconn *conn);
 extern int	PQsocket(const PGconn *conn);
 extern int	PQbackendPID(const PGconn *conn);
+extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
 extern int	PQconnectionNeedsPassword(const PGconn *conn);
 extern int	PQconnectionUsedPassword(const PGconn *conn);
 extern int	PQclientEncoding(const PGconn *conn);
@@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for pipeline mode management */
+extern int	PQenterPipelineMode(PGconn *conn);
+extern int	PQexitPipelineMode(PGconn *conn);
+extern int	PQsendPipeline(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 2f052f61f8..9b02e54f0d 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,7 +217,11 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch
+								 * result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch
+								 * result, more results expected from this
+								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
 	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
@@ -229,7 +233,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* Sync at end of a pipeline */
 } PGQueryClass;
 
 /* Target server type (decoded value of target_session_attrs) */
@@ -305,6 +310,22 @@ typedef enum pg_conn_host_type
 	CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by pipeline mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct PGcommandQueueEntry
+{
+	PGQueryClass queryclass;	/* Query type */
+	char	   *query;			/* SQL command, or NULL if none/unknown/OOM */
+	struct PGcommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about each of possibly several hosts
  * mentioned in the connection string.  Most fields are derived by splitting
@@ -389,12 +410,11 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
 	char		last_sqlstate[6];	/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	PGpipelineStatus pipelineStatus;	/* status of pipeline mode */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;	/* # bytes already returned in COPY OUT */
@@ -407,6 +427,16 @@ struct pg_conn
 	pg_conn_host *connhost;		/* details about each named host */
 	char	   *connip;			/* IP address for current network connection */
 
+	/*
+	 * The command queue, for pipeline mode.
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+	PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
 								 * unconnected */
@@ -795,6 +825,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
  */
 #define pqIsnonblocking(conn)	((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold.
+ */
+#define OUTBUFFER_THRESHOLD	65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 5391f461a2..93e7829c67 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -10,6 +10,7 @@ SUBDIRS = \
 		  delay_execution \
 		  dummy_index_am \
 		  dummy_seclabel \
+		  libpq_pipeline \
 		  plsample \
 		  snapshot_too_old \
 		  test_bloomfilter \
diff --git a/src/test/modules/libpq_pipeline/.gitignore b/src/test/modules/libpq_pipeline/.gitignore
new file mode 100644
index 0000000000..3a11e786b8
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/libpq_pipeline
diff --git a/src/test/modules/libpq_pipeline/Makefile b/src/test/modules/libpq_pipeline/Makefile
new file mode 100644
index 0000000000..b798f5fbbc
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/libpq_pipeline/Makefile
+
+PROGRAM = libpq_pipeline
+OBJS = libpq_pipeline.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS_INTERNAL += $(libpq_pgport)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/libpq_pipeline
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
new file mode 100644
index 0000000000..8819df4d92
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -0,0 +1,1160 @@
+/*
+ * src/test/modules/libpq_pipeline/libpq_pipeline.c
+ *		Verify libpq pipeline execution functionality
+ */
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+
+const char *const progname = "libpq_pipeline";
+
+
+#define DEBUG
+#ifdef DEBUG
+#define	pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS pq_pipeline_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql =
+"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_fatal_impl(int line, const char *fmt,...)
+{
+	va_list		args;
+
+
+	fflush(stdout);
+
+	fprintf(stderr, "\n%s:%d: ", progname, line);
+	va_start(args, fmt);
+	vfprintf(stderr, fmt, args);
+	va_end(args);
+	Assert(fmt[strlen(fmt) - 1] != '\n');
+	fprintf(stderr, "\n");
+	exit(1);
+}
+
+static void
+test_disallowed(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	fprintf(stderr, "test error cases... ");
+
+	if (PQisnonblocking(conn))
+		pg_fatal("Expected blocking connection mode");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("Unable to enter pipeline mode");
+
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Pipeline mode not activated properly");
+
+	/* PQexec should fail in pipeline mode */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQexec should fail in pipeline mode but succeeded");
+
+	/* Entering pipeline mode when already in pipeline mode is OK */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("re-entering pipeline mode should be a no-op but failed");
+
+	if (PQisBusy(conn) != 0)
+		pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
+
+	/* ok, back to normal command mode */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("couldn't exit idle empty pipeline mode");
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("Pipeline mode not terminated properly");
+
+	/* exiting pipeline mode when not in pipeline mode should be a no-op */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
+				 PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_simple_pipeline(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "simple pipeline... ");
+
+	/*
+	 * Enter pipeline mode and dispatch a set of operations, which we'll then
+	 * process the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim
+	 * processing of results since our output buffer will give us enough slush
+	 * to work with and we won't block on sending. So blocking mode is fine.
+	 */
+	if (PQisnonblocking(conn))
+		pg_fatal("Expected blocking connection mode");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1",
+						  1, dummy_param_oids, dummy_params,
+						  NULL, NULL, 0) != 1)
+		pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
+
+	if (PQsendPipeline(conn) != 1)
+		pg_fatal("Sending pipeline failed: %s", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first query result.");
+
+	/*
+	 * Even though we've processed the result there's still a sync to come and
+	 * we can't exit pipeline mode yet
+	 */
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after pipeline end: %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* We're still in pipeline mode... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* ... until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("Exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_pipelines(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi pipeline... ");
+
+	/*
+	 * Queue up a couple of small pipelines and process each without returning
+	 * to command mode first.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQsendPipeline(conn) != 1)
+		pg_fatal("Ending first pipeline failed: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQsendPipeline(conn) != 1)
+		pg_fatal("Ending second pipeline failed: %s", PQerrorMessage(conn));
+
+	/* OK, start processing the results */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first result");
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+
+#if 0
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+#endif
+
+	/* second pipeline */
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from second pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from second pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* We're still in pipeline mode ... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a pipeline fails the rest of the pipeline is flushed. We
+ * still have to get results for each pipeline item, but the item will just be
+ * a PGRES_PIPELINE_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the pipeline. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_aborted_pipeline(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+	int			i;
+	bool		goterror;
+
+	fprintf(stderr, "aborted pipeline... ");
+
+	res = PQexec(conn, drop_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
+
+	res = PQexec(conn, create_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
+
+	/*
+	 * Queue up a couple of small pipelines and process each without returning
+	 * to command mode first. Make sure the second operation in the first
+	 * pipeline ERRORs.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	dummy_params[0] = "1";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+						  1, dummy_param_oids, dummy_params,
+						  NULL, NULL, 0) != 1)
+		pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
+
+	dummy_params[0] = "2";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
+
+	if (PQsendPipeline(conn) != 1)
+		pg_fatal("Sending first pipeline failed: %s", PQerrorMessage(conn));
+
+	dummy_params[0] = "3";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second-pipeline insert failed: %s",
+				 PQerrorMessage(conn));
+
+	if (PQsendPipeline(conn) != 1)
+		pg_fatal("Ending second pipeline failed: %s", PQerrorMessage(conn));
+
+	/*
+	 * OK, start processing the pipeline results.
+	 *
+	 * We should get a command-ok for the first query, then a fatal error and
+	 * a pipeline aborted message for the second insert, a pipeline-end, then
+	 * a command-ok and a pipeline-ok for the second pipeline operation.
+	 */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Unexpected result status %s: %s",
+				 PQresStatus(PQresultStatus(res)),
+				 PQresultErrorMessage(res));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* Second query caused error, so we expect an error next */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	/*
+	 * pipeline should now be aborted.
+	 *
+	 * Note that we could still queue more queries at this point if we wanted;
+	 * they'd get added to a new third pipeline since we've already sent a
+	 * second. The aborted flag relates only to the pipeline being received.
+	 */
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+		pg_fatal("pipeline should be flagged as aborted but isn't");
+
+	/* third query in pipeline, the second insert */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
+		pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+		pg_fatal("pipeline should be flagged as aborted but isn't");
+
+	/* Ensure we're still in pipeline */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/*
+	 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
+	 *
+	 * (This is so clients know to start processing results normally again and
+	 * can tell the difference between skipped commands and the sync.)
+	 */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code from first pipeline sync\n"
+				 "Expected PGRES_PIPELINE_SYNC, got %s",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+#if 0
+	/* after the synchronization point we get a NULL result */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+#endif
+
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
+		pg_fatal("sync should've cleared the aborted flag but didn't");
+
+	/* We're still in pipeline mode... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* the insert from the second pipeline */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Unexpected result code %s from first item in second pipeline",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* Read the NULL result at the end of the command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+	/* the second pipeline sync */
+	if ((res = PQgetResult(conn)) == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from second pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s: %s",
+				 PQresStatus(PQresultStatus(res)),
+				 PQerrorMessage(conn));
+
+	/* Test single-row mode with an error partways */
+	if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendPipeline(conn);
+	PQsetSingleRowMode(conn);
+	goterror = false;
+	while ((res = PQgetResult(conn)) != NULL)
+	{
+		switch (PQresultStatus(res))
+		{
+			case PGRES_SINGLE_TUPLE:
+				printf("got row: %s\n", PQgetvalue(res, 0, 0));
+				break;
+			case PGRES_FATAL_ERROR:
+				if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
+					pg_fatal("expected division-by-zero, got: %s",
+							 PQresultErrorField(res, PG_DIAG_SQLSTATE));
+				printf("got expected division-by-zero\n");
+				goterror = true;
+				break;
+			default:
+				pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
+		}
+		PQclear(res);
+	}
+	if (!goterror)
+		pg_fatal("did not get division-by-zero error");
+	/* the third pipeline sync */
+	if ((res = PQgetResult(conn)) == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from third pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* We're still in pipeline mode... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+
+	/*-
+	 * Since we fired the pipelines off without a surrounding xact, the results
+	 * should be:
+	 *
+	 * - Implicit xact started by server around 1st pipeline
+	 * - First insert applied
+	 * - Second statement aborted xact
+	 * - Third insert skipped
+	 * - Sync rolled back first implicit xact
+	 * - Implicit xact created by server around 2nd pipeline
+	 * - insert applied from 2nd pipeline
+	 * - Sync commits 2nd xact
+	 *
+	 * So we should only have the value 3 that we inserted.
+	 */
+	res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		const char *val = PQgetvalue(res, i, 0);
+
+		if (strcmp(val, "3") != 0)
+			pg_fatal("expected only insert with value 3, got %s", val);
+	}
+
+	PQclear(res);
+}
+
+/* State machine enum for test_pipelined_insert */
+typedef enum PipelineInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+} PipelineInsertStep;
+
+static void
+test_pipelined_insert(PGconn *conn, int n_rows)
+{
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	PipelineInsertStep send_step = BI_BEGIN_TX,
+				recv_step = BI_BEGIN_TX;
+	int			rows_to_send,
+				rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a pipelined insert into a table created at the start of the pipeline
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	while (send_step != BI_PREPARE)
+	{
+		const char *sql;
+
+		switch (send_step)
+		{
+			case BI_BEGIN_TX:
+				sql = "BEGIN TRANSACTION";
+				send_step = BI_DROP_TABLE;
+				break;
+
+			case BI_DROP_TABLE:
+				sql = drop_table_sql;
+				send_step = BI_CREATE_TABLE;
+				break;
+
+			case BI_CREATE_TABLE:
+				sql = create_table_sql;
+				send_step = BI_PREPARE;
+				break;
+
+			default:
+				pg_fatal("invalid state");
+		}
+
+		pg_debug("sending: %s\n", sql);
+		if (PQsendQueryParams(conn, sql,
+							  0, NULL, NULL, NULL, NULL, 0) != 1)
+			pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
+	}
+
+	Assert(send_step == BI_PREPARE);
+	pg_debug("sending: %s\n", insert_sql);
+	if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1)
+		pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
+	send_step = BI_INSERT_ROWS;
+
+	/*
+	 * Now we start inserting. We'll be sending enough data that we could fill
+	 * our output buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each
+	 * query are processed we should pop them to allow processing of the next
+	 * query. There's no need to finish the pipeline before processing
+	 * results.
+	 */
+	if (PQsetnonblocking(conn, 1) != 0)
+		pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/*
+		 * Process any results, so we keep the server's output buffer free
+		 * flowing and it can continue to process input
+		 */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				PGresult   *res;
+				const char *cmdtag;
+				const char *description = "";
+				int			status;
+
+				/*
+				 * Read next result.  If no more results from this query,
+				 * advance to the next query
+				 */
+				res = PQgetResult(conn);
+				if (res == NULL)
+					continue;
+
+				status = PGRES_COMMAND_OK;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						recv_step++;
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						recv_step++;
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						recv_step++;
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						recv_step++;
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive--;
+						if (rows_to_receive == 0)
+							recv_step++;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						recv_step++;
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_PIPELINE_SYNC;
+						recv_step++;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						description = "";
+						abort();
+				}
+
+				if (PQresultStatus(res) != status)
+					pg_fatal("%s reported status %s, expected %s\n"
+							 "Error message: \"%s\"",
+							 description, PQresStatus(PQresultStatus(res)),
+							 PQresStatus(status), PQerrorMessage(conn));
+
+				if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+					pg_fatal("%s expected command tag '%s', got '%s'",
+							 description, cmdtag, PQcmdStatus(res));
+
+				pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
+
+				PQclear(res);
+			}
+		}
+
+		/* Write more rows and/or the end pipeline message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+
+				if (PQsendQueryPrepared(conn, "my_insert",
+										1, insert_params, NULL, NULL, 0) == 1)
+				{
+					pg_debug("sent row %d\n", rows_to_send);
+
+					rows_to_send--;
+					if (rows_to_send == 0)
+						send_step = BI_COMMIT_TX;
+				}
+				else
+				{
+					/*
+					 * in nonblocking mode, so it's OK for an insert to fail
+					 * to send
+					 */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+							rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+									  0, NULL, NULL, NULL, NULL, 0) == 1)
+				{
+					pg_debug("sent COMMIT\n");
+					send_step = BI_SYNC;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_SYNC)
+			{
+				if (PQsendPipeline(conn) == 1)
+				{
+					fprintf(stdout, "Sent pipeline\n");
+					send_step = BI_DONE;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: Ending pipeline failed: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+		}
+	}
+
+	/* We've got the sync message and the pipeline should be done */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQsetnonblocking(conn, 0) != 0)
+		pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+	PGresult   *res;
+	int			i;
+	bool		pipeline_ended = false;
+
+	/* 1 pipeline, 3 queries in it */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s",
+				 PQerrorMessage(conn));
+
+	for (i = 0; i < 3; i++)
+	{
+		char	   *param[1];
+
+		param[0] = psprintf("%d", 44 + i);
+
+		if (PQsendQueryParams(conn,
+							  "SELECT generate_series(42, $1)",
+							  1,
+							  NULL,
+							  (const char **) param,
+							  NULL,
+							  NULL,
+							  0) != 1)
+			pg_fatal("failed to send query: %s",
+					 PQerrorMessage(conn));
+		pfree(param[0]);
+	}
+	PQsendPipeline(conn);
+
+	for (i = 0; !pipeline_ended; i++)
+	{
+		bool		first = true;
+		bool		saw_ending_tuplesok;
+		bool		isSingleTuple = false;
+
+		/* Set single row mode for only first 2 SELECT queries */
+		if (i < 2)
+		{
+			if (PQsetSingleRowMode(conn) != 1)
+				pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
+		}
+
+		/* Consume rows for this query */
+		saw_ending_tuplesok = false;
+		while ((res = PQgetResult(conn)) != NULL)
+		{
+			ExecStatusType est = PQresultStatus(res);
+
+			if (est == PGRES_PIPELINE_SYNC)
+			{
+				fprintf(stderr, "end of pipeline reached\n");
+				pipeline_ended = true;
+				PQclear(res);
+				if (i != 3)
+					pg_fatal("Expected three results, got %d", i);
+				break;
+			}
+
+			/* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
+			if (first)
+			{
+				if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+					pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
+							 i, PQresStatus(est));
+				if (i >= 2 && est != PGRES_TUPLES_OK)
+					pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
+							 i, PQresStatus(est));
+				first = false;
+			}
+
+			fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
+			switch (est)
+			{
+				case PGRES_TUPLES_OK:
+					fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+					saw_ending_tuplesok = true;
+					if (isSingleTuple)
+					{
+						if (PQntuples(res) == 0)
+							fprintf(stderr, "all tuples received in query %d\n", i);
+						else
+							pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
+					}
+					break;
+
+				case PGRES_SINGLE_TUPLE:
+					isSingleTuple = true;
+					fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
+					break;
+
+				default:
+					pg_fatal("unexpected");
+			}
+			PQclear(res);
+		}
+		if (!pipeline_ended && !saw_ending_tuplesok)
+			pg_fatal("didn't get expected terminating TUPLES_OK");
+	}
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+}
+
+/*
+ * Simple test to verify that a pipeline is discarded as a whole when there's
+ * an error, ignoring transaction commands.
+ */
+static void
+test_transaction(PGconn *conn)
+{
+	PGresult   *res;
+	bool		expect_null;
+	int			num_sends = 0;
+
+	res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
+				 "CREATE TABLE pq_pipeline_tst (id int)");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to create test table: %s",
+				 PQerrorMessage(conn));
+	PQclear(res);
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s",
+				 PQerrorMessage(conn));
+	if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
+		pg_fatal("could not send prepare on pipeline: %s",
+				 PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn,
+						  "BEGIN",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	if (PQsendQueryParams(conn,
+						  "SELECT 0/0",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
+	 * get out of the pipeline-aborted state first.
+	 */
+	if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+		pg_fatal("failed to execute prepared: %s",
+				 PQerrorMessage(conn));
+
+	/* This insert fails because we're in pipeline-aborted state */
+	if (PQsendQueryParams(conn,
+						  "INSERT INTO pq_pipeline_tst VALUES (1)",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	PQsendPipeline(conn);
+	num_sends++;
+
+	/*
+	 * This insert fails even though the pipeline got a SYNC, because we're in
+	 * an aborted transaction
+	 */
+	if (PQsendQueryParams(conn,
+						  "INSERT INTO pq_pipeline_tst VALUES (2)",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	PQsendPipeline(conn);
+	num_sends++;
+
+	/*
+	 * Send ROLLBACK using prepared stmt. This one works because we just did
+	 * PQsendPipeline above.
+	 */
+	if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+		pg_fatal("failed to execute prepared: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Now that we're out of a transaction and in pipeline-good mode, this
+	 * insert works
+	 */
+	if (PQsendQueryParams(conn,
+						  "INSERT INTO pq_pipeline_tst VALUES (3)",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	PQsendPipeline(conn);
+	num_sends++;
+	PQsendPipeline(conn);
+	num_sends++;
+
+	expect_null = false;
+	for (int i = 0;; i++)
+	{
+		ExecStatusType restype;
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+		{
+			printf("%d: got NULL result\n", i);
+			if (!expect_null)
+				pg_fatal("did not expect NULL here");
+			expect_null = false;
+			continue;
+		}
+		restype = PQresultStatus(res);
+		printf("%d: got status %s", i, PQresStatus(restype));
+		if (expect_null)
+			pg_fatal("expected NULL");
+		if (restype == PGRES_FATAL_ERROR)
+			printf("; error: %s", PQerrorMessage(conn));
+		else if (restype == PGRES_PIPELINE_ABORTED)
+		{
+			printf(": command didn't run because pipeline aborted\n");
+		}
+		else
+			printf("\n");
+		PQclear(res);
+
+		if (restype == PGRES_PIPELINE_SYNC)
+			num_sends--;
+		else
+			expect_null = true;
+		if (num_sends <= 0)
+			break;
+	}
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("returned something extra after all the syncs: %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+	/* We expect to find one tuple containing the value "3" */
+	res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("did not get 1 tuple");
+	if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
+		pg_fatal("did not get expected tuple");
+	PQclear(res);
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+usage(const char *progname)
+{
+	fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
+	fprintf(stderr, "Usage:\n");
+	fprintf(stderr, "  %s testname [conninfo [number_of_rows]]\n", progname);
+	fprintf(stderr, "Tests:\n");
+	fprintf(stderr, "  disallowed_in_pipeline\n");
+	fprintf(stderr, "  simple_pipeline\n");
+	fprintf(stderr, "  multi_pipeline\n");
+	fprintf(stderr, "  pipeline_abort\n");
+	fprintf(stderr, "  singlerow\n");
+	fprintf(stderr, "  pipeline_insert\n");
+	fprintf(stderr, "  transaction\n");
+}
+
+int
+main(int argc, char **argv)
+{
+	const char *conninfo = "";
+	PGconn	   *conn;
+	int			numrows = 10000;
+	PGresult   *res;
+
+	/*
+	 * The testname parameter is mandatory; it can be followed by a conninfo
+	 * string and number of rows.
+	 */
+	if (argc < 2 || argc > 4)
+	{
+		usage(argv[0]);
+		exit(1);
+	}
+
+	if (argc >= 3)
+		conninfo = pg_strdup(argv[2]);
+
+	if (argc >= 4)
+	{
+		errno = 0;
+		numrows = strtol(argv[3], NULL, 10);
+		if (errno != 0 || numrows <= 0)
+		{
+			fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]);
+			exit(1);
+		}
+	}
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+	res = PQexec(conn, "SET lc_messages TO \"C\"");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
+
+	if (strcmp(argv[1], "disallowed_in_pipeline") == 0)
+		test_disallowed(conn);
+	else if (strcmp(argv[1], "simple_pipeline") == 0)
+		test_simple_pipeline(conn);
+	else if (strcmp(argv[1], "multi_pipeline") == 0)
+		test_multi_pipelines(conn);
+	else if (strcmp(argv[1], "pipeline_abort") == 0)
+		test_aborted_pipeline(conn);
+	else if (strcmp(argv[1], "pipeline_insert") == 0)
+		test_pipelined_insert(conn, numrows);
+	else if (strcmp(argv[1], "singlerow") == 0)
+		test_singlerowmode(conn);
+	else if (strcmp(argv[1], "transaction") == 0)
+		test_transaction(conn);
+	else
+	{
+		fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]);
+		usage(argv[0]);
+		exit(1);
+	}
+
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+	return 0;
+}
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
new file mode 100644
index 0000000000..a12f2dd47b
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -0,0 +1,31 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $numrows = 10000;
+my @tests   = qw(disallowed_in_pipeline
+  simple_pipeline
+  multi_pipeline
+  pipeline_abort
+  pipeline_insert
+  singlerow
+  transaction);
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+
+for my $testname (@tests)
+{
+	$node->command_ok(
+		[ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ],
+		"libp_pipeline $testname");
+}
+
+$node->stop('fast');
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 49614106dc..44b1a43f30 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -33,10 +33,11 @@ my @unlink_on_exit;
 
 # Set of variables for modules in contrib/ and src/test/modules/
 my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' };
-my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo');
-my @contrib_uselibpgport   = ('oid2name', 'vacuumlo');
-my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo');
-my $contrib_extralibs      = undef;
+my @contrib_uselibpq =
+  ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline');
+my @contrib_uselibpgport   = ('libpq_pipeline', 'oid2name', 'vacuumlo');
+my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo');
+my $contrib_extralibs      = { 'libpq_pipeline' => ['ws2_32.lib'] };
 my $contrib_extraincludes = { 'dblink' => ['src/backend'] };
 my $contrib_extrasource = {
 	'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ],
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 574a8a94fa..ca68e88a72 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1562,10 +1562,12 @@ PG_Locale_Strategy
 PG_Lock_Status
 PG_init_t
 PGcancel
+PGcommandQueueEntry
 PGconn
 PGdataValue
 PGlobjfuncs
 PGnotify
+PGpipelineStatus
 PGresAttDesc
 PGresAttValue
 PGresParamDesc

Reply via email to