On 12/16/16 10:38 AM, Andrew Borodin wrote:
> 2016-12-16 20:17 GMT+05:00 Peter Eisentraut 
> <peter.eisentr...@2ndquadrant.com>:
>>> And one more thing... Can we have BackgroundSessionExecute() splitted
>>> into two parts: start query and wait for results?
>>> It would allow pg_background to reuse bgsession's code.
>>
>> Yes, I will look into that.
> 
> Thank you. I'm marking both patches as "Waiting for author", keeping
> in mind that pg_background is waiting for bgsessions.
> After updates I'll review these patches.

New patch, mainly with the function split as you requested above, not
much else changed.

For additional entertainment, I include patches that integrate
background sessions into dblink.  So dblink can open a connection to a
background session, and then you can use the existing dblink functions
to send queries, read results, etc.  People use dblink to make
self-connections to get autonomous subsessions, so this would directly
address that use case.  The 0001 patch is some prerequisite refactoring
to remove an ugly macro mess, which is useful independent of this.  0002
is the actual patch.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 95c0499907cfe49ff944dc19900d678e5dd8b5de Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Wed, 28 Dec 2016 12:00:00 -0500
Subject: [PATCH v2] Add background sessions

This adds a C API to run SQL statements in a background worker,
communicating by FE/BE protocol over a DSM message queue.  This can be
used to execute statements and transactions separate from the main
foreground session.

Also included is a PL/Python interface to this functionality.
---
 doc/src/sgml/bgsession.sgml                     | 272 +++++++
 doc/src/sgml/filelist.sgml                      |   1 +
 doc/src/sgml/plpython.sgml                      | 102 +++
 doc/src/sgml/postgres.sgml                      |   1 +
 src/backend/commands/variable.c                 |   5 +
 src/backend/libpq/pqmq.c                        |  26 +-
 src/backend/storage/ipc/shm_mq.c                |  19 +
 src/backend/tcop/Makefile                       |   2 +-
 src/backend/tcop/bgsession.c                    | 911 ++++++++++++++++++++++++
 src/backend/tcop/postgres.c                     |  24 +-
 src/include/commands/variable.h                 |   1 +
 src/include/libpq/pqmq.h                        |   1 +
 src/include/storage/shm_mq.h                    |   3 +
 src/include/tcop/bgsession.h                    |  30 +
 src/include/tcop/tcopprot.h                     |   9 +
 src/pl/plpython/Makefile                        |   2 +
 src/pl/plpython/expected/plpython_bgsession.out | 188 +++++
 src/pl/plpython/expected/plpython_test.out      |   7 +-
 src/pl/plpython/plpy_bgsession.c                | 454 ++++++++++++
 src/pl/plpython/plpy_bgsession.h                |  18 +
 src/pl/plpython/plpy_main.h                     |   3 +
 src/pl/plpython/plpy_planobject.c               |   1 +
 src/pl/plpython/plpy_planobject.h               |   2 +
 src/pl/plpython/plpy_plpymodule.c               |   5 +
 src/pl/plpython/plpy_spi.c                      |   7 +-
 src/pl/plpython/plpy_spi.h                      |   3 +
 src/pl/plpython/sql/plpython_bgsession.sql      | 148 ++++
 27 files changed, 2224 insertions(+), 21 deletions(-)
 create mode 100644 doc/src/sgml/bgsession.sgml
 create mode 100644 src/backend/tcop/bgsession.c
 create mode 100644 src/include/tcop/bgsession.h
 create mode 100644 src/pl/plpython/expected/plpython_bgsession.out
 create mode 100644 src/pl/plpython/plpy_bgsession.c
 create mode 100644 src/pl/plpython/plpy_bgsession.h
 create mode 100644 src/pl/plpython/sql/plpython_bgsession.sql

diff --git a/doc/src/sgml/bgsession.sgml b/doc/src/sgml/bgsession.sgml
new file mode 100644
index 0000000000..ba5cb409d6
--- /dev/null
+++ b/doc/src/sgml/bgsession.sgml
@@ -0,0 +1,272 @@
+<!-- doc/src/sgml/bgsession.sgml -->
+
+<chapter id="bgsession">
+ <title>Background Session API</title>
+
+ <para>
+  The background session API is a C API for creating additional database
+  sessions in the background and running SQL statements in them.  A background
+  session behaves like a normal (foreground) session in that it has session
+  state, transactions, can run SQL statements, and so on.  Unlike a foreground
+  session, it is not connected directly to a client.  Instead the foreground
+  session can use this API to execute SQL statements and retrieve their
+  results.  Higher-level integrations, such as in procedural languages, can
+  make this functionality available to clients.  Background sessions are
+  independent from their foreground sessions in their session and transaction
+  state.  So a background session cannot see uncommitted data in foreground
+  sessions or vice versa, and there is no preferential treatment about
+  locking.  Like all sessions, background sessions are separate processes.
+  Foreground and background sessions communicate over shared memory messages
+  queues instead of the sockets that a client/server connection uses.
+ </para>
+
+ <para>
+  Background sessions can be useful in a variety of scenarios when effects
+  that are independent of the foreground session are to be achieved, for
+  example:
+  <itemizedlist>
+   <listitem>
+    <para>
+     Commit data independent of whether a foreground transaction commits, for
+     example for auditing.  A trigger in the foreground session could effect
+     the necessary writes via a background session.
+    </para>
+   </listitem>
+   <listitem>
+    <para>
+     Large changes can be split up into smaller transactions.  A foreground
+     session can orchestrate the logic, for example in a function, while the
+     actual writes and commits are executed in a background session.
+    </para>
+   </listitem>
+  </itemizedlist>
+ </para>
+
+ <sect1 id="bgsession-types">
+  <title>API</title>
+
+  <para>
+   Use <literal>#include "tcop/bgsession.h"</literal> to include the API
+   declarations.
+  </para>
+
+  <sect2>
+   <title>Types</title>
+
+   <variablelist>
+    <varlistentry>
+     <term><type>BackgroundSession</type></term>
+     <listitem>
+      <para>
+       An opaque connection handle.  Multiple background sessions can exist at
+       the same time, and each is prepresented by an instance of this type.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><type>BackgroundSessionPreparedStatement</type></term>
+     <listitem>
+      <para>
+       An opaque handle for a prepared statement, created when the statement
+       is prepared and used when the statement is executed.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><type>BackgroundSessionResult</type></term>
+     <listitem>
+      <para>
+       A handle for a query result, defined as:
+<programlisting>
+typedef struct BackgroundSessionResult
+{
+    TupleDesc   tupdesc;
+    List       *tuples;
+    const char *command;
+} BackgroundSessionResult;
+</programlisting>
+       <structfield>tupdesc</structfield> describes the result
+       rows, <symbol>NULL</symbol> if the command does not return
+       rows.  <structfield>tuples</structfield> is a list
+       of <type>HeapTuple</type>s with the result
+       rows.  <structfield>command</structfield> is the tag of the executed
+       command.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2>
+   <title>Functions</title>
+
+   <variablelist>
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSession *<function>BackgroundSessionStart</function></funcdef>
+        <paramdef>void</paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Creates a background session.  This starts the background worker and
+       establishes a connection to it.
+      </para>
+
+      <para>
+       A background session does not automatically end when the foreground
+       transaction or session ends.
+       Use <function>BackgroundSessionEnd()</function> to end a background
+       session.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>void <function>BackgroundSessionEnd</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Ends a background session.  This closes the connection the background
+       worker.
+      </para>
+
+      <para>
+       It is an error to close a background session with a transaction block
+       open.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecute</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute an SQL statement and return the result.  Access the fields of
+       the result structure to get details about the command result.  If there
+       is an error, this function does not return.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>void <function>BackgroundSessionSend</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute an SQL statement, but don't wait for the result.  The result
+       can then be fetched later
+       with <function>BackgroundSessionGetResult()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionSend</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Obtain the result of an SQL statement previously sent using
+       with <function>BackgroundSessionSend()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionPreparedStatement *<function>BackgroundSessionPrepare</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+        <paramdef>int <parameter>nargs</parameter></paramdef>
+        <paramdef>Oid <parameter>argtypes</parameter>[]</paramdef>
+        <paramdef>const char *<parameter>argnames</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Prepare an SQL statement for later execution
+       by <function>BackgroundSessionExecutePrepared()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecutePrepared</function></funcdef>
+        <paramdef>BackgroundSessionPreparedStatement *<parameter>stmt</parameter></paramdef>
+        <paramdef>int <parameter>nargs</parameter></paramdef>
+        <paramdef>Datum <parameter>values</parameter>[]</paramdef>
+        <paramdef>bool <parameter>nulls</parameter>[]</paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute a statement previously prepared by
+       <function>BackgroundSessionPrepare()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+
+   <para>
+    Here is a very simple example:
+
+<programlisting>
+#include "tcop/bgsession.h"
+
+void
+myfunc()
+{
+    BackgroundSession *session;
+    BackgroundSessionResult *result;
+
+    session = BackgroundSessionStart();
+
+    result = BackgroundSessionExecute(session, "SELECT ...");
+    elog(INFO, "returned %d rows", list_length(result->tuples));
+
+    BackgroundSessionEnd(session);
+}
+</programlisting>
+   </para>
+  </sect2>
+ </sect1>
+</chapter>
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 69649a7da4..7531c8f430 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -52,6 +52,7 @@
 <!ENTITY wal           SYSTEM "wal.sgml">
 
 <!-- programmer's guide -->
+<!ENTITY bgsession  SYSTEM "bgsession.sgml">
 <!ENTITY bgworker   SYSTEM "bgworker.sgml">
 <!ENTITY dfunc      SYSTEM "dfunc.sgml">
 <!ENTITY ecpg       SYSTEM "ecpg.sgml">
diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml
index 46397781be..53a2e8f9b9 100644
--- a/doc/src/sgml/plpython.sgml
+++ b/doc/src/sgml/plpython.sgml
@@ -1357,6 +1357,108 @@ <title>Older Python Versions</title>
   </sect2>
  </sect1>
 
+ <sect1 id="plpython-bgsession">
+  <title>Background Sessions</title>
+
+  <para>
+   PL/Python exposes the background session interface described
+   in <xref linkend="bgsession"> as a Python API.  A background session is
+   represented by a Python object of type <type>plpy.BackgroundSession</type>.
+   It has these methods:
+
+   <variablelist>
+    <varlistentry>
+     <term><literal>.close()</literal></term>
+     <listitem>
+      <para>
+       Close the session.  This must be called or the session will leak.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.execute(<parameter>sql</parameter>)</literal></term>
+     <listitem>
+      <para>
+       Execute an SQL statement.  The return value is a result object of the
+       same type as returned by <literal>plpy.execute</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.prepare(<parameter>sql</parameter>, <replaceable>types</replaceable>)</literal></term>
+     <listitem>
+      <para>
+       Prepare an SQL statement.  The parameter data types are specified as
+       for <literal>plpy.prepare</literal>.  The return value is a plan
+       object.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.execute_prepared(<parameter>plan</parameter>, <replaceable>values</replaceable>)</literal></term>
+     <listitem>
+      <para>
+       Execute a prepared statement.  The parameter values are specified as
+       for <literal>plpy.execute</literal>.  The return value is a result
+       object.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.__enter__()</literal></term>
+     <listitem>
+      <para>
+       This just returns self.  It is supplied so that
+       a <classname>BackgroundSession</classname> object can be used as a
+       context manager.  See example below.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.__exit__(<parameter>exc_type</parameter>, <parameter>exc</parameter>, <parameter>tb</parameter>)</literal></term>
+     <listitem>
+      <para>
+       This is the same as <literal>.close()</literal>.  It is supplied so
+       that a <classname>BackgroundSession</classname> object can be used as a
+       context manager.  (The arguments are ignored.)
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </para>
+
+  <para>
+   A <classname>BackgroundSession</classname> object can be preserved across
+   function calls or passed between functions via the <varname>SD</varname>
+   and <varname>GD</varname> variables.
+  </para>
+
+  <para>
+   Example:
+<programlisting>
+CREATE FUNCTION bulkload() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(1, 1001):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 100 == 0:
+            a.execute("COMMIT")
+
+return 0
+$$;
+</programlisting>
+   This function inserts 1000 values into the table and commits after every
+   100 values.
+  </para>
+ </sect1>
+
  <sect1 id="plpython-util">
   <title>Utility Functions</title>
   <para>
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 9143917c49..e281002f04 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -220,6 +220,7 @@ <title>Server Programming</title>
 
   &spi;
   &bgworker;
+  &bgsession;
   &logicaldecoding;
   &replication-origins;
 
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index defafa54b2..a522c693b9 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -674,12 +674,17 @@ show_random_seed(void)
  * SET CLIENT_ENCODING
  */
 
+void (*check_client_encoding_hook)(void);
+
 bool
 check_client_encoding(char **newval, void **extra, GucSource source)
 {
 	int			encoding;
 	const char *canonical_name;
 
+	if (check_client_encoding_hook)
+		check_client_encoding_hook();
+
 	/* Look up the encoding by name */
 	encoding = pg_valid_client_encoding(*newval);
 	if (encoding < 0)
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index f93ccae148..62955d10e6 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -48,6 +48,11 @@ static PQcommMethods PqCommMqMethods = {
 	mq_endcopyout
 };
 
+static PQcommMethods *save_PqCommMethods;
+static CommandDest save_whereToSendOutput;
+static ProtocolVersion save_FrontendProtocol;
+static dsm_segment *save_seg;
+
 /*
  * Arrange to redirect frontend/backend protocol messages to a shared-memory
  * message queue.
@@ -55,12 +60,30 @@ static PQcommMethods PqCommMqMethods = {
 void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
+	save_PqCommMethods = PqCommMethods;
+	save_whereToSendOutput = whereToSendOutput;
+	save_FrontendProtocol = FrontendProtocol;
+
 	PqCommMethods = &PqCommMqMethods;
 	pq_mq = shm_mq_get_queue(mqh);
 	pq_mq_handle = mqh;
 	whereToSendOutput = DestRemote;
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+
+	save_seg = seg;
+}
+
+void
+pq_stop_redirect_to_shm_mq(void)
+{
+	cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+	PqCommMethods = save_PqCommMethods;
+	whereToSendOutput = save_whereToSendOutput;
+	FrontendProtocol = save_FrontendProtocol;
+	pq_mq = NULL;
+	pq_mq_handle = NULL;
+	save_seg = NULL;
 }
 
 /*
@@ -182,7 +205,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 
 	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
 	if (result != SHM_MQ_SUCCESS)
-		return EOF;
+		ereport(COMMERROR,
+				(errmsg("could not send on message queue: %s", shm_mq_strerror(result))));
 	return 0;
 }
 
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index bfb67038ad..365eb32d25 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -805,6 +805,25 @@ shm_mq_get_queue(shm_mq_handle *mqh)
 }
 
 /*
+ * Get error message string.
+ */
+const char *
+shm_mq_strerror(shm_mq_result res)
+{
+	switch (res)
+	{
+		case SHM_MQ_SUCCESS:
+			return gettext_noop("Success");
+		case SHM_MQ_WOULD_BLOCK:
+			return gettext_noop("Operation would block");
+		case SHM_MQ_DETACHED:
+			return gettext_noop("Other process has detached queue");
+		default:
+			return gettext_noop("Unknown error");
+	}
+}
+
+/*
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile
index 674302feb7..ab267673a3 100644
--- a/src/backend/tcop/Makefile
+++ b/src/backend/tcop/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/tcop
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS= dest.o fastpath.o postgres.o pquery.o utility.o
+OBJS= bgsession.o dest.o fastpath.o postgres.o pquery.o utility.o
 
 ifneq (,$(filter $(PORTNAME),cygwin win32))
 override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c
new file mode 100644
index 0000000000..486819b696
--- /dev/null
+++ b/src/backend/tcop/bgsession.c
@@ -0,0 +1,911 @@
+/*--------------------------------------------------------------------------
+ *
+ * bgsession.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/tcop/bgsession.c
+ *
+ *
+ * This implements a C API to open a background session and run SQL queries
+ * in it.  The session looks much like a normal database connection, but it is
+ * always to the same database, and there is no authentication needed.  The
+ * "backend" for that connection is a background worker.  The normal backend
+ * and the background session worker communicate over the normal FE/BE
+ * protocol.
+ *
+ * Types:
+ *
+ * BackgroundSession -- opaque connection handle
+ * BackgroundSessionPreparedStatement -- opaque prepared statement handle
+ * BackgroundSessionResult -- query result
+ *
+ * Functions:
+ *
+ * BackgroundSessionStart() -- start a session (launches background worker)
+ * and return a handle
+ *
+ * BackgroundSessionEnd() -- close session and free resources
+ *
+ * BackgroundSessionExecute() -- run SQL string and return result (rows or
+ * status)
+ *
+ * BackgroundSessionSend() -- run SQL string without waiting for result
+ *
+ * BackgroundSessionGetResult() -- get result from prior ...Send()
+ *
+ * BackgroundSessionPrepare() -- prepare an SQL string for subsequent
+ * execution
+ *
+ * BackgroundSessionExecutePrepared() -- run prepared statement
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/tupdesc.h"
+#include "access/xact.h"
+#include "commands/async.h"
+#include "commands/variable.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/pg_list.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/bgsession.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define BGSESSION_MAGIC					0x50674267
+
+#define BGSESSION_KEY_FIXED_DATA		0
+#define BGSESSION_KEY_GUC				1
+#define BGSESSION_KEY_COMMAND_QUEUE		2
+#define BGSESSION_KEY_RESPONSE_QUEUE	3
+#define BGSESSION_NKEYS					4
+
+#define BGSESSION_QUEUE_SIZE			16384
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct background_session_fixed_data
+{
+	Oid database_id;
+	Oid authenticated_user_id;
+	Oid current_user_id;
+	int sec_context;
+} background_session_fixed_data;
+
+struct BackgroundSession
+{
+	ResourceOwner resowner;
+	dsm_segment *seg;
+	BackgroundWorkerHandle *worker_handle;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	int		transaction_status;
+};
+
+struct BackgroundSessionPreparedStatement
+{
+	BackgroundSession *session;
+	Oid		   *argtypes;
+	TupleDesc	tupdesc;
+};
+
+static void bgsession_worker_main(Datum main_arg);
+static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg);
+static void bgsession_check_client_encoding_hook(void);
+static TupleDesc TupleDesc_from_RowDescription(StringInfo msg);
+static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg);
+static void forward_NotifyResponse(StringInfo msg);
+static void rethrow_errornotice(StringInfo msg);
+static void invalid_protocol_message(char msgtype) pg_attribute_noreturn();
+
+
+BackgroundSession *
+BackgroundSessionStart(void)
+{
+	ResourceOwner oldowner;
+	BackgroundWorker worker;
+	pid_t		pid;
+	BackgroundSession *session;
+	shm_toc_estimator e;
+	Size		segsize;
+	Size		guc_len;
+	char	   *gucstate;
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	background_session_fixed_data *fdata;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	BgwHandleStatus bgwstatus;
+	StringInfoData msg;
+	char		msgtype;
+
+	session = palloc(sizeof(*session));
+
+	session->resowner = ResourceOwnerCreate(NULL, "background session");
+
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(background_session_fixed_data));
+	shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE);
+	shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE);
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&e, guc_len);
+	shm_toc_estimate_keys(&e, BGSESSION_NKEYS);
+	segsize = shm_toc_estimate(&e);
+	oldowner = CurrentResourceOwner;
+	PG_TRY();
+	{
+		CurrentResourceOwner = session->resowner;
+		seg = dsm_create(segsize, 0);
+	}
+	PG_CATCH();
+	{
+		CurrentResourceOwner = oldowner;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+	CurrentResourceOwner = oldowner;
+
+	session->seg = seg;
+
+	toc = shm_toc_create(BGSESSION_MAGIC, dsm_segment_address(seg), segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(*fdata));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+	shm_toc_insert(toc, BGSESSION_KEY_FIXED_DATA, fdata);
+
+	/* Store GUC state in dynamic shared memory. */
+	gucstate = shm_toc_allocate(toc, guc_len);
+	SerializeGUCState(guc_len, gucstate);
+	shm_toc_insert(toc, BGSESSION_KEY_GUC, gucstate);
+
+	command_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE),
+							   BGSESSION_QUEUE_SIZE);
+	shm_toc_insert(toc, BGSESSION_KEY_COMMAND_QUEUE, command_mq);
+	shm_mq_set_sender(command_mq, MyProc);
+
+	response_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE),
+								BGSESSION_QUEUE_SIZE);
+	shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq);
+	shm_mq_set_receiver(response_mq, MyProc);
+
+	session->command_qh = shm_mq_attach(command_mq, seg, NULL);
+	session->response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = bgsession_worker_main;
+	snprintf(worker.bgw_name, BGW_MAXLEN, "background session by PID %d", MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+				 errhint("You might need to increase max_worker_processes.")));
+
+	shm_mq_set_handle(session->command_qh, session->worker_handle);
+	shm_mq_set_handle(session->response_qh, session->worker_handle);
+
+	bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+	if (bgwstatus != BGWH_STARTED)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not start background worker")));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return session;
+}
+
+
+void
+BackgroundSessionEnd(BackgroundSession *session)
+{
+	StringInfoData msg;
+
+	if (session->transaction_status == 'T')
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("background session ended with transaction block open")));
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'X');
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	pfree(session->worker_handle);
+	dsm_detach(session->seg);
+	ResourceOwnerRelease(session->resowner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
+	ResourceOwnerDelete(session->resowner);
+	pfree(session);
+}
+
+
+void
+BackgroundSessionSend(BackgroundSession *session, const char *sql)
+{
+	StringInfoData msg;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'Q');
+	pq_sendstring(&msg, sql);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionGetResult(BackgroundSession *session)
+{
+	StringInfoData msg;
+	char		msgtype;
+	BackgroundSessionResult *result;
+
+	result = palloc0(sizeof(*result));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!result->tupdesc)
+					elog(ERROR, "no T before D");
+				result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(result->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return result;
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionExecute(BackgroundSession *session, const char *sql)
+{
+	BackgroundSessionSend(session, sql);
+	return BackgroundSessionGetResult(session);
+}
+
+
+BackgroundSessionPreparedStatement *
+BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs,
+						 Oid argtypes[], const char *argnames[])
+{
+	BackgroundSessionPreparedStatement *result;
+	StringInfoData msg;
+	int			i;
+	char		msgtype;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'P');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, sql);
+	pq_sendint(&msg, nargs, 2);
+	for (i = 0; i < nargs; i++)
+		pq_sendint(&msg, argtypes[i], 4);
+	if (argnames)
+		for (i = 0; i < nargs; i++)
+			pq_sendstring(&msg, argnames[i]);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->session = session;
+	result->argtypes = palloc(nargs * sizeof(*result->argtypes));
+	memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes));
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '1':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'D');
+	pq_sendbyte(&msg, 'S');
+	pq_sendstring(&msg, "");
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'E':
+				rethrow_errornotice(&msg);
+				break;
+			case 'n':
+				break;
+			case 't':
+				/* ignore for now */
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'n' && msgtype != 'T');
+
+	return result;
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[])
+{
+	BackgroundSession *session;
+	StringInfoData msg;
+	BackgroundSessionResult *result;
+	char		msgtype;
+	int			i;
+
+	session = stmt->session;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'B');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 1, 2);  /* number of parameter format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_sendint(&msg, nargs, 2);  /* number of parameter values */
+	for (i = 0; i < nargs; i++)
+	{
+		if (nulls[i])
+			pq_sendint(&msg, -1, 4);
+		else
+		{
+			Oid			typsend;
+			bool		typisvarlena;
+			bytea	   *outputbytes;
+
+			getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena);
+			outputbytes = OidSendFunctionCall(typsend, values[i]);
+			pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4);
+			pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ);
+			pfree(outputbytes);
+		}
+	}
+	pq_sendint(&msg, 1, 2);  /* number of result column format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '2':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'E');
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 0, 4);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->tupdesc = stmt->tupdesc;
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!stmt->tupdesc)
+					elog(ERROR, "did not expect any rows");
+				result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(stmt->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'C');
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_putemptymessage('S');
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case 'A':
+			forward_NotifyResponse(&msg);
+			break;
+		case 'Z':
+			session->transaction_status = pq_getmsgbyte(&msg);
+			pq_getmsgend(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	return result;
+}
+
+
+static void
+bgsession_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	background_session_fixed_data *fdata;
+	char	   *gucstate;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	StringInfoData msg;
+	char		msgtype;
+
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "background session worker");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "background session",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(BGSESSION_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find data structures in dynamic shared memory. */
+	fdata = shm_toc_lookup(toc, BGSESSION_KEY_FIXED_DATA);
+
+	gucstate = shm_toc_lookup(toc, BGSESSION_KEY_GUC);
+
+	command_mq = shm_toc_lookup(toc, BGSESSION_KEY_COMMAND_QUEUE);
+	shm_mq_set_receiver(command_mq, MyProc);
+	command_qh = shm_mq_attach(command_mq, seg, NULL);
+
+	response_mq = shm_toc_lookup(toc, BGSESSION_KEY_RESPONSE_QUEUE);
+	shm_mq_set_sender(response_mq, MyProc);
+	response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	pq_redirect_to_shm_mq(seg, response_qh);
+	BackgroundWorkerInitializeConnectionByOid(fdata->database_id,
+											  fdata->authenticated_user_id);
+
+	SetClientEncoding(GetDatabaseEncoding());
+
+	StartTransactionCommand();
+	RestoreGUCState(gucstate);
+	CommitTransactionCommand();
+
+	process_session_preload_libraries();
+
+	SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+	whereToSendOutput = DestRemote;
+	ReadyForQuery(whereToSendOutput);
+
+	MessageContext = AllocSetContextCreate(TopMemoryContext,
+										   "MessageContext",
+										   ALLOCSET_DEFAULT_MINSIZE,
+										   ALLOCSET_DEFAULT_INITSIZE,
+										   ALLOCSET_DEFAULT_MAXSIZE);
+
+	do
+	{
+		MemoryContextSwitchTo(MessageContext);
+		MemoryContextResetAndDeleteChildren(MessageContext);
+
+		ProcessCompletedNotifies();
+		pgstat_report_stat(false);
+		pgstat_report_activity(STATE_IDLE, NULL);
+
+		shm_mq_receive_stringinfo(command_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'B':
+				{
+					SetCurrentStatementStartTimestamp();
+					exec_bind_message(&msg);
+					break;
+				}
+			case 'D':
+				{
+					int         describe_type;
+					const char *describe_target;
+
+					SetCurrentStatementStartTimestamp();
+
+					describe_type = pq_getmsgbyte(&msg);
+					describe_target = pq_getmsgstring(&msg);
+					pq_getmsgend(&msg);
+
+					switch (describe_type)
+					{
+						case 'S':
+							exec_describe_statement_message(describe_target);
+							break;
+#ifdef TODO
+						case 'P':
+							exec_describe_portal_message(describe_target);
+							break;
+#endif
+						default:
+							ereport(ERROR,
+									(errcode(ERRCODE_PROTOCOL_VIOLATION),
+									 errmsg("invalid DESCRIBE message subtype %d",
+											describe_type)));
+							break;
+					}
+				}
+				break;
+			case 'E':
+				{
+					const char *portal_name;
+					int			max_rows;
+
+					SetCurrentStatementStartTimestamp();
+
+					portal_name = pq_getmsgstring(&msg);
+					max_rows = pq_getmsgint(&msg, 4);
+					pq_getmsgend(&msg);
+
+					exec_execute_message(portal_name, max_rows);
+				}
+				break;
+
+			case 'P':
+				{
+					const char *stmt_name;
+					const char *query_string;
+					int			numParams;
+					Oid		   *paramTypes = NULL;
+
+					SetCurrentStatementStartTimestamp();
+
+					stmt_name = pq_getmsgstring(&msg);
+					query_string = pq_getmsgstring(&msg);
+					numParams = pq_getmsgint(&msg, 2);
+					if (numParams > 0)
+					{
+						int			i;
+
+						paramTypes = palloc(numParams * sizeof(Oid));
+						for (i = 0; i < numParams; i++)
+							paramTypes[i] = pq_getmsgint(&msg, 4);
+					}
+					pq_getmsgend(&msg);
+
+					exec_parse_message(query_string, stmt_name,
+									   paramTypes, numParams);
+					break;
+				}
+			case 'Q':
+				{
+					const char *sql;
+					int save_log_statement;
+					bool save_log_duration;
+					int save_log_min_duration_statement;
+
+					sql = pq_getmsgstring(&msg);
+					pq_getmsgend(&msg);
+
+					/* XXX room for improvement */
+					save_log_statement = log_statement;
+					save_log_duration = log_duration;
+					save_log_min_duration_statement = log_min_duration_statement;
+
+					check_client_encoding_hook = bgsession_check_client_encoding_hook;
+					log_statement = LOGSTMT_NONE;
+					log_duration = false;
+					log_min_duration_statement = -1;
+
+					SetCurrentStatementStartTimestamp();
+					exec_simple_query(sql, 1);
+
+					log_statement = save_log_statement;
+					log_duration = save_log_duration;
+					log_min_duration_statement = save_log_min_duration_statement;
+					check_client_encoding_hook = NULL;
+
+					ReadyForQuery(whereToSendOutput);
+					break;
+				}
+			case 'S':
+				{
+					pq_getmsgend(&msg);
+					finish_xact_command();
+					ReadyForQuery(whereToSendOutput);
+					break;
+				}
+			case 'X':
+				break;
+			default:
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("invalid protocol message type from background session leader: %c",
+								msgtype)));
+				break;
+		}
+	}
+	while (msgtype != 'X');
+}
+
+
+static void
+shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg)
+{
+	shm_mq_result res;
+	Size		nbytes;
+	void	   *data;
+
+	res = shm_mq_receive(qh, &nbytes, &data, false);
+	if (res != SHM_MQ_SUCCESS)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not read from message queue: %s", shm_mq_strerror(res))));
+
+	initStringInfo(msg);
+	appendBinaryStringInfo(msg, data, nbytes);
+}
+
+
+static void
+bgsession_check_client_encoding_hook(void)
+{
+	elog(ERROR, "cannot set client encoding in background session");
+}
+
+
+static TupleDesc
+TupleDesc_from_RowDescription(StringInfo msg)
+{
+	TupleDesc	tupdesc;
+	int			natts = pq_getmsgint(msg, 2);
+	int			i;
+
+	tupdesc = CreateTemplateTupleDesc(natts, false);
+	for (i = 0; i < natts; i++)
+	{
+		const char *colname;
+		Oid     type_oid;
+		int32	typmod;
+		int16	format;
+
+		colname = pq_getmsgstring(msg);
+		(void) pq_getmsgint(msg, 4);   /* table OID */
+		(void) pq_getmsgint(msg, 2);   /* table attnum */
+		type_oid = pq_getmsgint(msg, 4);
+		(void) pq_getmsgint(msg, 2);   /* type length */
+		typmod = pq_getmsgint(msg, 4);
+		format = pq_getmsgint(msg, 2);
+		(void) format;
+#ifdef TODO
+		/* XXX The protocol sometimes sends 0 (text) if the format is not
+		 * determined yet.  We always use binary, so this check is probably
+		 * not useful. */
+		if (format != 1)
+			elog(ERROR, "format must be binary");
+#endif
+
+		TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0);
+	}
+	return tupdesc;
+}
+
+
+static HeapTuple
+HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg)
+{
+	int			natts = pq_getmsgint(msg, 2);
+	int			i;
+	Datum	   *values;
+	bool	   *nulls;
+	StringInfoData buf;
+
+	Assert(tupdesc);
+
+	if (natts != tupdesc->natts)
+		elog(ERROR, "malformed DataRow");
+
+	values = palloc(natts * sizeof(*values));
+	nulls = palloc(natts * sizeof(*nulls));
+	initStringInfo(&buf);
+
+	for (i = 0; i < natts; i++)
+	{
+		int32 len = pq_getmsgint(msg, 4);
+
+		if (len < 0)
+			nulls[i] = true;
+		else
+		{
+			Oid recvid;
+			Oid typioparams;
+
+			nulls[i] = false;
+
+			getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid,
+								   &recvid,
+								   &typioparams);
+			resetStringInfo(&buf);
+			appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len);
+			values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams,
+											   tupdesc->attrs[i]->atttypmod);
+		}
+	}
+
+	return heap_form_tuple(tupdesc, values, nulls);
+}
+
+
+static void
+forward_NotifyResponse(StringInfo msg)
+{
+	int32	pid;
+	const char *channel;
+	const char *payload;
+
+	pid = pq_getmsgint(msg, 4);
+	channel = pq_getmsgrawstring(msg);
+	payload = pq_getmsgrawstring(msg);
+	pq_endmessage(msg);
+
+	NotifyMyFrontEnd(channel, payload, pid);
+}
+
+
+static void
+rethrow_errornotice(StringInfo msg)
+{
+	ErrorData   edata;
+
+	pq_parse_errornotice(msg, &edata);
+	edata.elevel = Min(edata.elevel, ERROR);
+	ThrowErrorData(&edata);
+}
+
+
+static void
+invalid_protocol_message(char msgtype)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("invalid protocol message type from background session: %c",
+					msgtype)));
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index b17923106a..e5f34c9d5c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -180,8 +180,6 @@ static int	errdetail_execute(List *raw_parsetree_list);
 static int	errdetail_params(ParamListInfo params);
 static int	errdetail_abort(void);
 static int	errdetail_recovery_conflict(void);
-static void start_xact_command(void);
-static void finish_xact_command(void);
 static bool IsTransactionExitStmt(Node *parsetree);
 static bool IsTransactionExitStmtList(List *parseTrees);
 static bool IsTransactionStmtList(List *parseTrees);
@@ -869,8 +867,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)
  *
  * Execute a "simple Query" protocol message.
  */
-static void
-exec_simple_query(const char *query_string)
+void
+exec_simple_query(const char *query_string, int16 format)
 {
 	CommandDest dest = whereToSendOutput;
 	MemoryContext oldcontext;
@@ -963,7 +961,6 @@ exec_simple_query(const char *query_string)
 				   *plantree_list;
 		Portal		portal;
 		DestReceiver *receiver;
-		int16		format;
 
 		/*
 		 * Get the command name for use in status display (it also becomes the
@@ -1054,6 +1051,8 @@ exec_simple_query(const char *query_string)
 		 */
 		PortalStart(portal, NULL, 0, InvalidSnapshot);
 
+		if (format < 0)
+		{
 		/*
 		 * Select the appropriate output format: text unless we are doing a
 		 * FETCH from a binary cursor.  (Pretty grotty to have to do this here
@@ -1074,6 +1073,7 @@ exec_simple_query(const char *query_string)
 					format = 1; /* BINARY */
 			}
 		}
+		}
 		PortalSetResultFormat(portal, 1, &format);
 
 		/*
@@ -1185,7 +1185,7 @@ exec_simple_query(const char *query_string)
  *
  * Execute a "Parse" protocol message.
  */
-static void
+void
 exec_parse_message(const char *query_string,	/* string to execute */
 				   const char *stmt_name,		/* name for prepared stmt */
 				   Oid *paramTypes,		/* parameter types */
@@ -1447,7 +1447,7 @@ exec_parse_message(const char *query_string,	/* string to execute */
  *
  * Process a "Bind" message to create a portal from a prepared statement
  */
-static void
+void
 exec_bind_message(StringInfo input_message)
 {
 	const char *portal_name;
@@ -1829,7 +1829,7 @@ exec_bind_message(StringInfo input_message)
  *
  * Process an "Execute" message for a portal
  */
-static void
+void
 exec_execute_message(const char *portal_name, long max_rows)
 {
 	CommandDest dest;
@@ -2278,7 +2278,7 @@ errdetail_recovery_conflict(void)
  *
  * Process a "Describe" message for a prepared statement
  */
-static void
+void
 exec_describe_statement_message(const char *stmt_name)
 {
 	CachedPlanSource *psrc;
@@ -2422,7 +2422,7 @@ exec_describe_portal_message(const char *portal_name)
 /*
  * Convenience routines for starting/committing a single command.
  */
-static void
+void
 start_xact_command(void)
 {
 	if (!xact_started)
@@ -2440,7 +2440,7 @@ start_xact_command(void)
 	}
 }
 
-static void
+void
 finish_xact_command(void)
 {
 	if (xact_started)
@@ -4069,7 +4069,7 @@ PostgresMain(int argc, char *argv[],
 					if (am_walsender)
 						exec_replication_command(query_string);
 					else
-						exec_simple_query(query_string);
+						exec_simple_query(query_string, -1);
 
 					send_ready_for_query = true;
 				}
diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h
index 81059515da..73faff7671 100644
--- a/src/include/commands/variable.h
+++ b/src/include/commands/variable.h
@@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s
 extern bool check_random_seed(double *newval, void **extra, GucSource source);
 extern void assign_random_seed(double newval, void *extra);
 extern const char *show_random_seed(void);
+extern void (*check_client_encoding_hook)(void);
 extern bool check_client_encoding(char **newval, void **extra, GucSource source);
 extern void assign_client_encoding(const char *newval, void *extra);
 extern bool check_session_authorization(char **newval, void **extra, GucSource source);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index 8c03acb308..6cc00909bf 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
+extern void pq_stop_redirect_to_shm_mq(void);
 extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 7ea25397b9..64d3ae65d8 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -79,6 +79,9 @@ extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 
+/* Get error message string. */
+extern const char *shm_mq_strerror(shm_mq_result res);
+
 /* Smallest possible queue. */
 extern PGDLLIMPORT const Size shm_mq_minimum_size;
 
diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h
new file mode 100644
index 0000000000..71415a61f0
--- /dev/null
+++ b/src/include/tcop/bgsession.h
@@ -0,0 +1,30 @@
+#ifndef BGSESSION_H
+#define BGSESSION_H
+
+#include "access/tupdesc.h"
+#include "nodes/pg_list.h"
+
+struct BackgroundSession;
+typedef struct BackgroundSession BackgroundSession;
+
+struct BackgroundSessionPreparedStatement;
+typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement;
+
+typedef struct BackgroundSessionResult
+{
+	TupleDesc	tupdesc;
+	List	   *tuples;
+	const char *command;
+} BackgroundSessionResult;
+
+BackgroundSession *BackgroundSessionStart(void);
+void BackgroundSessionEnd(BackgroundSession *session);
+
+void BackgroundSessionSend(BackgroundSession *session, const char *sql);
+BackgroundSessionResult *BackgroundSessionGetResult(BackgroundSession *session);
+BackgroundSessionResult *BackgroundSessionExecute(BackgroundSession *session, const char *sql);
+
+BackgroundSessionPreparedStatement *BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, Oid argtypes[], const char *argnames[]);
+BackgroundSessionResult *BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]);
+
+#endif /* BGSESSION_H */
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 7254355862..8f2ff8adf9 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -57,6 +57,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions,
 			  ParamListInfo boundParams);
 extern List *pg_plan_queries(List *querytrees, int cursorOptions,
 				ParamListInfo boundParams);
+extern void exec_simple_query(const char *query_string, int16 format);
+extern void exec_parse_message(const char *query_string, const char *stmt_name,
+							   Oid paramTypes[], int numParams);
+extern void exec_bind_message(StringInfo input_message);
+extern void exec_execute_message(const char *portal_name, long max_rows);
+extern void exec_describe_statement_message(const char *stmt_name);
 
 extern bool check_max_stack_depth(int *newval, void **extra, GucSource source);
 extern void assign_max_stack_depth(int newval, void *extra);
@@ -70,6 +76,9 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S
 extern void ProcessClientReadInterrupt(bool blocked);
 extern void ProcessClientWriteInterrupt(bool blocked);
 
+extern void start_xact_command(void);
+extern void finish_xact_command(void);
+
 extern void process_postgres_switches(int argc, char *argv[],
 						  GucContext ctx, const char **dbname);
 extern void PostgresMain(int argc, char *argv[],
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index 7680d49cb6..9895a6e869 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language"
 NAME = plpython$(python_majorversion)
 
 OBJS = \
+	plpy_bgsession.o \
 	plpy_cursorobject.o \
 	plpy_elog.o \
 	plpy_exec.o \
@@ -89,6 +90,7 @@ REGRESS = \
 	plpython_quote \
 	plpython_composite \
 	plpython_subtransaction \
+	plpython_bgsession \
 	plpython_drop
 
 REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_bgsession.out b/src/pl/plpython/expected/plpython_bgsession.out
new file mode 100644
index 0000000000..b8dff07071
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_bgsession.out
@@ -0,0 +1,188 @@
+CREATE TABLE test1 (a int, b text);
+CREATE FUNCTION bgsession_test() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT bgsession_test();
+ bgsession_test 
+----------------
+             42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+CREATE FUNCTION bgsession_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT bgsession_test2();
+INFO:  <PLyResult status=5 nrows=6 rows=[{'a': 0, 'b': None}, {'a': 2, 'b': None}, {'a': 4, 'b': None}, {'a': 6, 'b': None}, {'a': 8, 'b': None}, {'a': 11, 'b': None}]>
+ bgsession_test2 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+CREATE FUNCTION bgsession_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+
+return 42
+$$;
+SELECT bgsession_test3();
+NOTICE:  notice
+ERROR:  error
+CONTEXT:  PL/pgSQL function inline_code_block line 1 at RAISE
+PL/Python function "bgsession_test3"
+CREATE FUNCTION bgsession_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("SET client_encoding TO SJIS")
+
+return 42
+$$;
+SELECT bgsession_test4();
+ERROR:  cannot set client encoding in background session
+CONTEXT:  PL/Python function "bgsession_test4"
+TRUNCATE test1;
+CREATE FUNCTION bgsession_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+
+return 42
+$$;
+SELECT bgsession_test5();
+ bgsession_test5 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a |  b  
+---+-----
+ 1 | one
+ 2 | two
+(2 rows)
+
+TRUNCATE test1;
+CREATE FUNCTION bgsession_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT bgsession_test7();
+INFO:  <PLyResult status=5 nrows=1 rows=[{'a': 11, 'b': None}]>
+ bgsession_test7 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+CREATE FUNCTION bgsession_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+
+return 42
+$$;
+SELECT bgsession_test8();
+ERROR:  background session ended with transaction block open
+CONTEXT:  PL/Python function "bgsession_test8"
+TRUNCATE test1;
+CREATE FUNCTION bgsession_test9a() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = plpy.BackgroundSession()
+GD['bg'] = bg
+bg.execute("BEGIN")
+bg.execute("INSERT INTO test1 VALUES (1)")
+
+return 1
+$$;
+CREATE FUNCTION bgsession_test9b() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = GD['bg']
+bg.execute("INSERT INTO test1 VALUES (2)")
+bg.execute("COMMIT")
+bg.close()
+
+return 2
+$$;
+SELECT bgsession_test9a();
+ bgsession_test9a 
+------------------
+                1
+(1 row)
+
+SELECT bgsession_test9b();
+ bgsession_test9b 
+------------------
+                2
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 1 | 
+ 2 | 
+(2 rows)
+
+DROP TABLE test1;
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index 847e4cc412..2001b60fa4 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,8 +43,9 @@ contents.sort()
 return contents
 $$ LANGUAGE plpythonu;
 select module_contents();
- module_contents 
------------------
+  module_contents  
+-------------------
+ BackgroundSession
  Error
  Fatal
  SPIError
@@ -63,7 +64,7 @@ select module_contents();
  spiexceptions
  subtransaction
  warning
-(18 rows)
+(19 rows)
 
 CREATE FUNCTION elog_test_basic() RETURNS void
 AS $$
diff --git a/src/pl/plpython/plpy_bgsession.c b/src/pl/plpython/plpy_bgsession.c
new file mode 100644
index 0000000000..68f720760f
--- /dev/null
+++ b/src/pl/plpython/plpy_bgsession.c
@@ -0,0 +1,454 @@
+/*
+ * the PLyBackgroundSession class
+ *
+ * src/pl/plpython/plpy_bgsession.c
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "executor/spi.h"
+#include "parser/parse_type.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+#include "plpython.h"
+
+#include "plpy_bgsession.h"
+
+#include "plpy_elog.h"
+#include "plpy_main.h"
+#include "plpy_planobject.h"
+#include "plpy_spi.h"
+
+
+static PyObject *PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw);
+static void PLyBackgroundSession_dealloc(PyObject *subxact);
+static PyObject *PLyBackgroundSession_enter(PyObject *self, PyObject *unused);
+static PyObject *PLyBackgroundSession_close(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_execute(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_prepare(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args);
+
+static char PLyBackgroundSession_doc[] = {
+	"PostgreSQL background session context manager"
+};
+
+static PyMethodDef PLyBackgroundSession_methods[] = {
+	{"close", PLyBackgroundSession_close, METH_VARARGS, NULL},
+	{"__enter__", PLyBackgroundSession_enter, METH_VARARGS, NULL},
+	{"__exit__", PLyBackgroundSession_close, METH_VARARGS, NULL},
+	{"execute", PLyBackgroundSession_execute, METH_VARARGS, NULL},
+	{"prepare", PLyBackgroundSession_prepare, METH_VARARGS, NULL},
+	{"execute_prepared", PLyBackgroundSession_execute_prepared, METH_VARARGS, NULL},
+	{NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLyBackgroundSession_Type = {
+	PyVarObject_HEAD_INIT(NULL, 0)
+	"plpy.BackgroundSession",		/* tp_name */
+	sizeof(PLyBackgroundSession_Object),	/* tp_size */
+	0,								/* tp_itemsize */
+
+	/*
+	 * methods
+	 */
+	PLyBackgroundSession_dealloc,	/* tp_dealloc */
+	0,								/* tp_print */
+	0,								/* tp_getattr */
+	0,								/* tp_setattr */
+	0,								/* tp_compare */
+	0,								/* tp_repr */
+	0,								/* tp_as_number */
+	0,								/* tp_as_sequence */
+	0,								/* tp_as_mapping */
+	0,								/* tp_hash */
+	0,								/* tp_call */
+	0,								/* tp_str */
+	0,								/* tp_getattro */
+	0,								/* tp_setattro */
+	0,								/* tp_as_buffer */
+	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,	/* tp_flags */
+	PLyBackgroundSession_doc,		/* tp_doc */
+	0,								/* tp_traverse */
+	0,								/* tp_clear */
+	0,								/* tp_richcompare */
+	0,								/* tp_weaklistoffset */
+	0,								/* tp_iter */
+	0,								/* tp_iternext */
+	PLyBackgroundSession_methods,	/* tp_tpmethods */
+	0,								/* tp_members */
+	0,								/* tp_getset */
+	0,								/* tp_base */
+	0,								/* tp_dict */
+	0,								/* tp_descr_get */
+	0,								/* tp_descr_set */
+	0,								/* tp_dictoffset */
+	0,								/* tp_init */
+	0,								/* tp_alloc */
+	PLyBackgroundSession_new,		/* tp_new */
+	0,								/* tp_free */
+};
+
+
+int
+PLy_bgsession_init_type(PyObject *module)
+{
+	if (PyType_Ready(&PLyBackgroundSession_Type) < 0)
+		return -1;
+
+	Py_INCREF(&PLyBackgroundSession_Type);
+	if (PyModule_AddObject(module, "BackgroundSession", (PyObject *)&PLyBackgroundSession_Type) < 0)
+		return -1;
+
+	return 0;
+}
+
+static PyObject *
+PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw)
+{
+	PyObject   *result = type->tp_alloc(type, 0);
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) result;
+
+	bgsession->bgsession = BackgroundSessionStart();
+
+	return result;
+}
+
+/*
+ * Python requires a dealloc function to be defined
+ */
+static void
+PLyBackgroundSession_dealloc(PyObject *self)
+{
+}
+
+/*
+ * bgsession.__enter__() or bgsession.enter()
+ */
+static PyObject *
+PLyBackgroundSession_enter(PyObject *self, PyObject *unused)
+{
+	Py_INCREF(self);
+	return self;
+}
+
+/*
+ * bgsession.close() or bgsession.__exit__(exc_type, exc, tb)
+ */
+static PyObject *
+PLyBackgroundSession_close(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	BackgroundSessionEnd(bgsession->bgsession);
+	bgsession->bgsession = NULL;
+
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
+static PyObject *
+PLyBackgroundSession_execute(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+	char	   *query;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (PyArg_ParseTuple(args, "s:execute", &query))
+	{
+		BackgroundSessionResult *result;
+		HeapTuple  *tuples;
+		ListCell   *lc;
+		int			i;
+		SPITupleTable faketupletable;
+
+		result = BackgroundSessionExecute(bgsession->bgsession, query);
+		if (result->tupdesc)
+		{
+			tuples = palloc(list_length(result->tuples) * sizeof(*tuples));
+			i = 0;
+			foreach (lc, result->tuples)
+			{
+				HeapTuple tuple = (HeapTuple) lfirst(lc);
+				tuples[i++] = tuple;
+			}
+			faketupletable.tupdesc = result->tupdesc;
+			faketupletable.vals = tuples;
+			return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT);
+		}
+		else
+			return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY);
+	}
+	else
+		PLy_exception_set(PLy_exc_error, "background session execute expected a query");
+	return NULL;
+}
+
+// XXX lots of overlap with PLy_spi_prepare
+static PyObject *
+PLyBackgroundSession_prepare(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+	char	   *query;
+	PyObject   *paraminfo = NULL;
+	BackgroundSessionPreparedStatement *bgstmt;
+	int			nargs = 0;
+	const char **argnames = NULL;
+	PLyPlanObject *plan;
+	PyObject   *volatile optr = NULL;
+	volatile MemoryContext oldcontext;
+	int			i;
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	PyObject *keys;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "s|O:prepare", &query, &paraminfo))
+		return NULL;
+
+	if (paraminfo &&
+		!PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of prepare must be a sequence or mapping");
+		return NULL;
+	}
+
+	if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL)
+		return NULL;
+
+	plan->mcxt = AllocSetContextCreate(TopMemoryContext,
+									   "PL/Python background plan context",
+									   ALLOCSET_DEFAULT_MINSIZE,
+									   ALLOCSET_DEFAULT_INITSIZE,
+									   ALLOCSET_DEFAULT_MAXSIZE);
+
+	oldcontext = MemoryContextSwitchTo(plan->mcxt);
+
+	if (!paraminfo)
+		nargs = 0;
+	else if (PySequence_Check(paraminfo))
+		nargs = PySequence_Length(paraminfo);
+	else
+		nargs = PyMapping_Length(paraminfo);
+
+	plan->nargs = nargs;
+	plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL;
+	plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL;
+	plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (PyMapping_Check(paraminfo))
+	{
+		argnames = palloc(nargs * sizeof(char *));
+		keys = PyMapping_Keys(paraminfo);
+	}
+	else
+	{
+		argnames = NULL;
+		keys = NULL;
+	}
+
+	for (i = 0; i < nargs; i++)
+	{
+		PLy_typeinfo_init(&plan->args[i], plan->mcxt);
+		plan->values[i] = PointerGetDatum(NULL);
+	}
+
+	for (i = 0; i < nargs; i++)
+	{
+		char	   *sptr;
+		HeapTuple	typeTup;
+		Oid			typeId;
+		int32		typmod;
+
+		if (keys)
+		{
+			PyObject *key;
+			char *keystr;
+
+			key = PySequence_GetItem(keys, i);
+			argnames[i] = keystr = PyString_AsString(key);
+			optr = PyMapping_GetItemString(paraminfo, keystr);
+			Py_DECREF(key);
+		}
+		else
+			optr = PySequence_GetItem(paraminfo, i);
+
+		if (PyString_Check(optr))
+			sptr = PyString_AsString(optr);
+		else if (PyUnicode_Check(optr))
+			sptr = PLyUnicode_AsString(optr);
+		else
+		{
+			ereport(ERROR,
+					(errmsg("background session prepare: type name at ordinal position %d is not a string", i)));
+			sptr = NULL;	/* keep compiler quiet */
+		}
+
+		/********************************************************
+		 * Resolve argument type names and then look them up by
+		 * oid in the system cache, and remember the required
+		 *information for input conversion.
+		 ********************************************************/
+
+		parseTypeString(sptr, &typeId, &typmod, false);
+
+		typeTup = SearchSysCache1(TYPEOID,
+								  ObjectIdGetDatum(typeId));
+		if (!HeapTupleIsValid(typeTup))
+			elog(ERROR, "cache lookup failed for type %u", typeId);
+
+		Py_DECREF(optr);
+
+		/*
+		 * set optr to NULL, so we won't try to unref it again in case of
+		 * an error
+		 */
+		optr = NULL;
+
+		plan->types[i] = typeId;
+		PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes);
+		ReleaseSysCache(typeTup);
+	}
+
+	bgstmt = BackgroundSessionPrepare(bgsession->bgsession, query, nargs, plan->types, argnames);
+
+	plan->bgstmt = bgstmt;
+
+	return (PyObject *) plan;
+}
+
+static PyObject *
+PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession pg_attribute_unused() = (PLyBackgroundSession_Object *) self;
+	PyObject   *ob;
+	PLyPlanObject *plan;
+	PyObject   *list = NULL;
+	int			nargs;
+	bool	   *nulls;
+	BackgroundSessionResult *result;
+	HeapTuple  *tuples;
+	ListCell   *lc;
+	int			i;
+	SPITupleTable faketupletable;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list))
+		return NULL;
+
+	if (!is_PLyPlanObject(ob))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "first argument of execute_prepared must be a plan");
+		return NULL;
+	}
+
+	plan = (PLyPlanObject *) ob;
+
+	if (list && (!PySequence_Check(list)))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of execute_prepared must be a sequence");
+		return NULL;
+	}
+
+	nargs = list ? PySequence_Length(list) : 0;
+
+	if (nargs != plan->nargs)
+	{
+		char	   *sv;
+		PyObject   *so = PyObject_Str(list);
+
+		if (!so)
+			PLy_elog(ERROR, "could not execute plan");
+		sv = PyString_AsString(so);
+		PLy_exception_set_plural(PyExc_TypeError,
+							  "Expected sequence of %d argument, got %d: %s",
+							 "Expected sequence of %d arguments, got %d: %s",
+								 plan->nargs,
+								 plan->nargs, nargs, sv);
+		Py_DECREF(so);
+
+		return NULL;
+	}
+
+	nulls = palloc(nargs * sizeof(*nulls));
+
+	for (i = 0; i < nargs; i++)
+	{
+		PyObject   *elem;
+
+		elem = PySequence_GetItem(list, i);
+		if (elem != Py_None)
+		{
+			PG_TRY();
+			{
+				plan->values[i] =
+					plan->args[i].out.d.func(&(plan->args[i].out.d),
+											 -1,
+											 elem,
+											 false);
+			}
+			PG_CATCH();
+			{
+				Py_DECREF(elem);
+				PG_RE_THROW();
+			}
+			PG_END_TRY();
+
+			Py_DECREF(elem);
+			nulls[i] = false;
+		}
+		else
+		{
+			Py_DECREF(elem);
+			plan->values[i] =
+				InputFunctionCall(&(plan->args[i].out.d.typfunc),
+								  NULL,
+								  plan->args[i].out.d.typioparam,
+								  -1);
+			nulls[i] = true;
+		}
+	}
+
+	result = BackgroundSessionExecutePrepared(plan->bgstmt, nargs, plan->values, nulls);
+	if (result->tupdesc)
+	{
+		tuples = palloc(list_length(result->tuples) * sizeof(*tuples));
+		i = 0;
+		foreach (lc, result->tuples)
+		{
+			HeapTuple tuple = (HeapTuple) lfirst(lc);
+			tuples[i++] = tuple;
+		}
+		faketupletable.tupdesc = result->tupdesc;
+		faketupletable.vals = tuples;
+		return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT);
+	}
+	else
+		return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY);
+}
diff --git a/src/pl/plpython/plpy_bgsession.h b/src/pl/plpython/plpy_bgsession.h
new file mode 100644
index 0000000000..39daca2d39
--- /dev/null
+++ b/src/pl/plpython/plpy_bgsession.h
@@ -0,0 +1,18 @@
+/*
+ * src/pl/plpython/plpy_bgsession.h
+ */
+
+#ifndef PLPY_BGSESSION_H
+#define PLPY_BGSESSION_H
+
+#include "tcop/bgsession.h"
+
+typedef struct PLyBackgroundSession_Object
+{
+	PyObject_HEAD
+	BackgroundSession *bgsession;
+} PLyBackgroundSession_Object;
+
+extern int PLy_bgsession_init_type(PyObject *module);
+
+#endif   /* PLPY_BGSESSION_H */
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..b7ee34c3f2 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -7,6 +7,8 @@
 
 #include "plpy_procedure.h"
 
+#include "tcop/bgsession.h"
+
 /* the interpreter's globals dict */
 extern PyObject *PLy_interp_globals;
 
@@ -19,6 +21,7 @@ typedef struct PLyExecutionContext
 {
 	PLyProcedure *curr_proc;	/* the currently executing procedure */
 	MemoryContext scratch_ctx;	/* a context for things like type I/O */
+	BackgroundSession *bgsession;
 	struct PLyExecutionContext *next;	/* previous stack level */
 } PLyExecutionContext;
 
diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c
index 16c39a05dd..006a3afd80 100644
--- a/src/pl/plpython/plpy_planobject.c
+++ b/src/pl/plpython/plpy_planobject.c
@@ -77,6 +77,7 @@ PLy_plan_new(void)
 		return NULL;
 
 	ob->plan = NULL;
+	ob->bgstmt = NULL;
 	ob->nargs = 0;
 	ob->types = NULL;
 	ob->values = NULL;
diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h
index c67559266e..fba814ad8a 100644
--- a/src/pl/plpython/plpy_planobject.h
+++ b/src/pl/plpython/plpy_planobject.h
@@ -6,6 +6,7 @@
 #define PLPY_PLANOBJECT_H
 
 #include "executor/spi.h"
+#include "tcop/bgsession.h"
 #include "plpy_typeio.h"
 
 
@@ -13,6 +14,7 @@ typedef struct PLyPlanObject
 {
 	PyObject_HEAD
 	SPIPlanPtr	plan;
+	BackgroundSessionPreparedStatement *bgstmt;
 	int			nargs;
 	Oid		   *types;
 	Datum	   *values;
diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c
index 0cf2ad29cb..dde0bbc829 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -13,6 +13,7 @@
 
 #include "plpy_plpymodule.h"
 
+#include "plpy_bgsession.h"
 #include "plpy_cursorobject.h"
 #include "plpy_elog.h"
 #include "plpy_planobject.h"
@@ -137,6 +138,8 @@ PyInit_plpy(void)
 		return NULL;
 
 	PLy_add_exceptions(m);
+	if (PLy_bgsession_init_type(plpy) < 0)
+		return NULL;
 
 	return m;
 }
@@ -167,6 +170,8 @@ PLy_init_plpy(void)
 #else
 	plpy = Py_InitModule("plpy", PLy_methods);
 	PLy_add_exceptions(plpy);
+	if (PLy_bgsession_init_type(plpy) < 0)
+		PLy_elog(ERROR, "could not initialize BackgroundSession type");
 #endif
 
 	/* PyDict_SetItemString(plpy, "PlanType", (PyObject *) &PLy_PlanType); */
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..697efb20aa 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -31,8 +31,6 @@
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
-							 uint64 rows, int status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
 
@@ -292,6 +290,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 		rv = SPI_execute_plan(plan->plan, plan->values, nulls,
 							  exec_ctx->curr_proc->fn_readonly, limit);
 		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		if (nargs > 0)
 			pfree(nulls);
@@ -361,6 +360,7 @@ PLy_spi_execute_query(char *query, long limit)
 		pg_verifymbstr(query, strlen(query), false);
 		rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
 		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
 	}
@@ -383,7 +383,7 @@ PLy_spi_execute_query(char *query, long limit)
 	return ret;
 }
 
-static PyObject *
+PyObject *
 PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 {
 	PLyResultObject *result;
@@ -470,7 +470,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 		PG_END_TRY();
 
 		MemoryContextDelete(cxt);
-		SPI_freetuptable(tuptable);
 	}
 
 	return (PyObject *) result;
diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h
index b0427947ef..9ed37e50f4 100644
--- a/src/pl/plpython/plpy_spi.h
+++ b/src/pl/plpython/plpy_spi.h
@@ -5,12 +5,15 @@
 #ifndef PLPY_SPI_H
 #define PLPY_SPI_H
 
+#include "executor/spi.h"
 #include "utils/palloc.h"
 #include "utils/resowner.h"
 
 extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args);
 extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args);
 
+extern PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status);
+
 typedef struct PLyExceptionEntry
 {
 	int			sqlstate;		/* hash key, must be first */
diff --git a/src/pl/plpython/sql/plpython_bgsession.sql b/src/pl/plpython/sql/plpython_bgsession.sql
new file mode 100644
index 0000000000..5c33ab29e7
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_bgsession.sql
@@ -0,0 +1,148 @@
+CREATE TABLE test1 (a int, b text);
+
+CREATE FUNCTION bgsession_test() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT bgsession_test();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION bgsession_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT bgsession_test2();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION bgsession_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+
+return 42
+$$;
+
+SELECT bgsession_test3();
+
+
+CREATE FUNCTION bgsession_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("SET client_encoding TO SJIS")
+
+return 42
+$$;
+
+SELECT bgsession_test4();
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION bgsession_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+
+return 42
+$$;
+
+SELECT bgsession_test5();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION bgsession_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT bgsession_test7();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION bgsession_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+
+return 42
+$$;
+
+SELECT bgsession_test8();
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION bgsession_test9a() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = plpy.BackgroundSession()
+GD['bg'] = bg
+bg.execute("BEGIN")
+bg.execute("INSERT INTO test1 VALUES (1)")
+
+return 1
+$$;
+
+CREATE FUNCTION bgsession_test9b() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = GD['bg']
+bg.execute("INSERT INTO test1 VALUES (2)")
+bg.execute("COMMIT")
+bg.close()
+
+return 2
+$$;
+
+SELECT bgsession_test9a();
+SELECT bgsession_test9b();
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
-- 
2.11.0

From 474a9c5d20cd484e25864b93d675590047368b27 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Sun, 25 Dec 2016 12:00:00 -0500
Subject: [PATCH] dblink: Replace some macros by static functions

Also remove some unused code and the no longer useful dblink.h file.
---
 contrib/dblink/dblink.c | 290 +++++++++++++++++++++++-------------------------
 contrib/dblink/dblink.h |  39 -------
 2 files changed, 137 insertions(+), 192 deletions(-)
 delete mode 100644 contrib/dblink/dblink.h

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 5b041ddc7c..05938d0aeb 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -60,8 +60,6 @@
 #include "utils/rel.h"
 #include "utils/tqual.h"
 
-#include "dblink.h"
-
 PG_MODULE_MAGIC;
 
 typedef struct remoteConn
@@ -145,98 +143,102 @@ typedef struct remoteConnHashEnt
 /* initial number of connection hashes */
 #define NUMCONN 16
 
-/* general utility */
-#define xpfree(var_) \
-	do { \
-		if (var_ != NULL) \
-		{ \
-			pfree(var_); \
-			var_ = NULL; \
-		} \
-	} while (0)
-
-#define xpstrdup(var_c, var_) \
-	do { \
-		if (var_ != NULL) \
-			var_c = pstrdup(var_); \
-		else \
-			var_c = NULL; \
-	} while (0)
-
-#define DBLINK_RES_INTERNALERROR(p2) \
-	do { \
-			msg = pstrdup(PQerrorMessage(conn)); \
-			if (res) \
-				PQclear(res); \
-			elog(ERROR, "%s: %s", p2, msg); \
-	} while (0)
-
-#define DBLINK_CONN_NOT_AVAIL \
-	do { \
-		if(conname) \
-			ereport(ERROR, \
-					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
-					 errmsg("connection \"%s\" not available", conname))); \
-		else \
-			ereport(ERROR, \
-					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
-					 errmsg("connection not available"))); \
-	} while (0)
-
-#define DBLINK_GET_CONN \
-	do { \
-			char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
-			rconn = getConnectionByName(conname_or_str); \
-			if (rconn) \
-			{ \
-				conn = rconn->conn; \
-				conname = conname_or_str; \
-			} \
-			else \
-			{ \
-				connstr = get_connect_string(conname_or_str); \
-				if (connstr == NULL) \
-				{ \
-					connstr = conname_or_str; \
-				} \
-				dblink_connstr_check(connstr); \
-				conn = PQconnectdb(connstr); \
-				if (PQstatus(conn) == CONNECTION_BAD) \
-				{ \
-					msg = pstrdup(PQerrorMessage(conn)); \
-					PQfinish(conn); \
-					ereport(ERROR, \
-							(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
-							 errmsg("could not establish connection"), \
-							 errdetail_internal("%s", msg))); \
-				} \
-				dblink_security_check(conn, rconn); \
-				if (PQclientEncoding(conn) != GetDatabaseEncoding()) \
-					PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
-				freeconn = true; \
-			} \
-	} while (0)
-
-#define DBLINK_GET_NAMED_CONN \
-	do { \
-			conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
-			rconn = getConnectionByName(conname); \
-			if (rconn) \
-				conn = rconn->conn; \
-			else \
-				DBLINK_CONN_NOT_AVAIL; \
-	} while (0)
-
-#define DBLINK_INIT \
-	do { \
-			if (!pconn) \
-			{ \
-				pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
-				pconn->conn = NULL; \
-				pconn->openCursorCount = 0; \
-				pconn->newXactForCursor = FALSE; \
-			} \
-	} while (0)
+static char *
+xpstrdup(const char *in)
+{
+	if (in == NULL)
+		return NULL;
+	return pstrdup(in);
+}
+
+static void
+dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
+{
+	char	   *msg = pstrdup(PQerrorMessage(conn));
+	if (res)
+		PQclear(res);
+	elog(ERROR, "%s: %s", p2, msg);
+}
+
+static void pg_attribute_noreturn()
+dblink_conn_not_avail(const char *conname)
+{
+	if (conname)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("connection \"%s\" not available", conname)));
+	else
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("connection not available")));
+}
+
+static void
+dblink_get_conn(char *conname_or_str,
+				PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
+{
+	remoteConn *rconn = getConnectionByName(conname_or_str);
+	PGconn	   *conn;
+	char	   *conname;
+	bool		freeconn;
+
+	if (rconn)
+	{
+		conn = rconn->conn;
+		conname = conname_or_str;
+		freeconn = false;
+	}
+	else
+	{
+		const char *connstr;
+
+		connstr = get_connect_string(conname_or_str);
+		if (connstr == NULL)
+			connstr = conname_or_str;
+		dblink_connstr_check(connstr);
+		conn = PQconnectdb(connstr);
+		if (PQstatus(conn) == CONNECTION_BAD)
+		{
+			char	   *msg = pstrdup(PQerrorMessage(conn));
+			PQfinish(conn);
+			ereport(ERROR,
+					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+					 errmsg("could not establish connection"),
+					 errdetail_internal("%s", msg)));
+		}
+		dblink_security_check(conn, rconn);
+		if (PQclientEncoding(conn) != GetDatabaseEncoding())
+			PQsetClientEncoding(conn, GetDatabaseEncodingName());
+		freeconn = true;
+		conname = NULL;
+	}
+
+	*conn_p = conn;
+	*conname_p = conname;
+	*freeconn_p = freeconn;
+}
+
+static PGconn *
+dblink_get_named_conn(const char *conname)
+{
+	remoteConn *rconn = getConnectionByName(conname);
+	if (rconn)
+		return rconn->conn;
+	else
+		dblink_conn_not_avail(conname);
+}
+
+static void
+dblink_init(void)
+{
+	if (!pconn)
+	{
+		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
+		pconn->conn = NULL;
+		pconn->openCursorCount = 0;
+		pconn->newXactForCursor = FALSE;
+	}
+}
 
 /*
  * Create a persistent connection to another database
@@ -252,7 +254,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 	PGconn	   *conn = NULL;
 	remoteConn *rconn = NULL;
 
-	DBLINK_INIT;
+	dblink_init();
 
 	if (PG_NARGS() == 2)
 	{
@@ -317,7 +319,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	PGconn	   *conn = NULL;
 
-	DBLINK_INIT;
+	dblink_init();
 
 	if (PG_NARGS() == 1)
 	{
@@ -330,7 +332,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 		conn = pconn->conn;
 
 	if (!conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 
 	PQfinish(conn);
 	if (rconn)
@@ -351,7 +353,6 @@ PG_FUNCTION_INFO_V1(dblink_open);
 Datum
 dblink_open(PG_FUNCTION_ARGS)
 {
-	char	   *msg;
 	PGresult   *res = NULL;
 	PGconn	   *conn = NULL;
 	char	   *curname = NULL;
@@ -361,7 +362,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	DBLINK_INIT;
+	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 2)
@@ -400,7 +401,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	}
 
 	if (!rconn || !rconn->conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 	else
 		conn = rconn->conn;
 
@@ -409,7 +410,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	{
 		res = PQexec(conn, "BEGIN");
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			DBLINK_RES_INTERNALERROR("begin error");
+			dblink_res_internalerror(conn, res, "begin error");
 		PQclear(res);
 		rconn->newXactForCursor = TRUE;
 
@@ -449,11 +450,10 @@ dblink_close(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	char	   *conname = NULL;
 	StringInfoData buf;
-	char	   *msg;
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	DBLINK_INIT;
+	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 1)
@@ -488,7 +488,7 @@ dblink_close(PG_FUNCTION_ARGS)
 	}
 
 	if (!rconn || !rconn->conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 	else
 		conn = rconn->conn;
 
@@ -516,7 +516,7 @@ dblink_close(PG_FUNCTION_ARGS)
 
 			res = PQexec(conn, "COMMIT");
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
-				DBLINK_RES_INTERNALERROR("commit error");
+				dblink_res_internalerror(conn, res, "commit error");
 			PQclear(res);
 		}
 	}
@@ -542,7 +542,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	DBLINK_INIT;
+	dblink_init();
 
 	if (PG_NARGS() == 4)
 	{
@@ -586,7 +586,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	}
 
 	if (!conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 
 	initStringInfo(&buf);
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
@@ -632,15 +632,13 @@ PG_FUNCTION_INFO_V1(dblink_send_query);
 Datum
 dblink_send_query(PG_FUNCTION_ARGS)
 {
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	char	   *sql = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
+	char	   *sql;
 	int			retval;
 
 	if (PG_NARGS() == 2)
 	{
-		DBLINK_GET_NAMED_CONN;
+		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 	}
 	else
@@ -670,15 +668,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 
 	prepTuplestoreResult(fcinfo);
 
-	DBLINK_INIT;
+	dblink_init();
 
 	PG_TRY();
 	{
-		char	   *msg;
-		char	   *connstr = NULL;
 		char	   *sql = NULL;
 		char	   *conname = NULL;
-		remoteConn *rconn = NULL;
 		bool		fail = true;	/* default to backward compatible */
 
 		if (!is_async)
@@ -686,7 +681,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 3)
 			{
 				/* text,text,bool */
-				DBLINK_GET_CONN;
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				fail = PG_GETARG_BOOL(2);
 			}
@@ -701,7 +696,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 				}
 				else
 				{
-					DBLINK_GET_CONN;
+					dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				}
 			}
@@ -721,13 +716,13 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 2)
 			{
 				/* text,bool */
-				DBLINK_GET_NAMED_CONN;
+				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 				fail = PG_GETARG_BOOL(1);
 			}
 			else if (PG_NARGS() == 1)
 			{
 				/* text */
-				DBLINK_GET_NAMED_CONN;
+				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 			}
 			else
 				/* shouldn't happen */
@@ -735,7 +730,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		}
 
 		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
+			dblink_conn_not_avail(conname);
 
 		if (!is_async)
 		{
@@ -1296,12 +1291,10 @@ PG_FUNCTION_INFO_V1(dblink_is_busy);
 Datum
 dblink_is_busy(PG_FUNCTION_ARGS)
 {
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
 
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
+	dblink_init();
+	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 
 	PQconsumeInput(conn);
 	PG_RETURN_INT32(PQisBusy(conn));
@@ -1322,15 +1315,13 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
 Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
-	int			res = 0;
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	int			res;
+	PGconn	   *conn;
 	PGcancel   *cancel;
 	char		errbuf[256];
 
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
+	dblink_init();
+	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 	cancel = PQgetCancel(conn);
 
 	res = PQcancel(cancel, errbuf, 256);
@@ -1358,12 +1349,10 @@ Datum
 dblink_error_message(PG_FUNCTION_ARGS)
 {
 	char	   *msg;
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
 
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
+	dblink_init();
+	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 
 	msg = PQerrorMessage(conn);
 	if (msg == NULL || msg[0] == '\0')
@@ -1383,22 +1372,19 @@ dblink_exec(PG_FUNCTION_ARGS)
 	PGconn	   *volatile conn = NULL;
 	volatile bool freeconn = false;
 
-	DBLINK_INIT;
+	dblink_init();
 
 	PG_TRY();
 	{
-		char	   *msg;
 		PGresult   *res = NULL;
-		char	   *connstr = NULL;
 		char	   *sql = NULL;
 		char	   *conname = NULL;
-		remoteConn *rconn = NULL;
 		bool		fail = true;	/* default to backward compatible behavior */
 
 		if (PG_NARGS() == 3)
 		{
 			/* must be text,text,bool */
-			DBLINK_GET_CONN;
+			dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			fail = PG_GETARG_BOOL(2);
 		}
@@ -1413,7 +1399,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 			}
 			else
 			{
-				DBLINK_GET_CONN;
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			}
 		}
@@ -1428,7 +1414,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 			elog(ERROR, "wrong number of arguments");
 
 		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
+			dblink_conn_not_avail(conname);
 
 		res = PQexec(conn, sql);
 		if (!res ||
@@ -1879,9 +1865,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify);
 Datum
 dblink_get_notify(PG_FUNCTION_ARGS)
 {
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
 	PGnotify   *notify;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
@@ -1891,9 +1875,9 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	DBLINK_INIT;
+	dblink_init();
 	if (PG_NARGS() == 1)
-		DBLINK_GET_NAMED_CONN;
+		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 	else
 		conn = pconn->conn;
 
@@ -2697,10 +2681,10 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 	else
 		sqlstate = ERRCODE_CONNECTION_FAILURE;
 
-	xpstrdup(message_primary, pg_diag_message_primary);
-	xpstrdup(message_detail, pg_diag_message_detail);
-	xpstrdup(message_hint, pg_diag_message_hint);
-	xpstrdup(message_context, pg_diag_context);
+	message_primary = xpstrdup(pg_diag_message_primary);
+	message_detail = xpstrdup(pg_diag_message_detail);
+	message_hint = xpstrdup(pg_diag_message_hint);
+	message_context = xpstrdup(pg_diag_context);
 
 	/*
 	 * If we don't get a message from the PGresult, try the PGconn.  This
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
deleted file mode 100644
index 1102236967..0000000000
--- a/contrib/dblink/dblink.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * dblink.h
- *
- * Functions returning results from a remote database
- *
- * Joe Conway <m...@joeconway.com>
- * And contributors:
- * Darko Prenosil <darko.preno...@finteh.hr>
- * Shridhar Daithankar <shridhar_daithan...@persistent.co.in>
- *
- * contrib/dblink/dblink.h
- * Copyright (c) 2001-2016, PostgreSQL Global Development Group
- * ALL RIGHTS RESERVED;
- *
- * Permission to use, copy, modify, and distribute this software and its
- * documentation for any purpose, without fee, and without a written agreement
- * is hereby granted, provided that the above copyright notice and this
- * paragraph and the following two paragraphs appear in all copies.
- *
- * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
- * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
- * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
- * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
- * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
- * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
- * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
- * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
- *
- */
-
-#ifndef DBLINK_H
-#define DBLINK_H
-
-#include "fmgr.h"
-
-#endif   /* DBLINK_H */
-- 
2.11.0

From 5183b8fe31eb5c7216bdb6979b2526c4bc6c14cd Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Tue, 27 Dec 2016 12:00:00 -0500
Subject: [PATCH] dblink: Add background sessions support

---
 contrib/dblink/dblink--1.2.sql     |  10 +
 contrib/dblink/dblink.c            | 557 +++++++++++++++++++++++--------------
 contrib/dblink/expected/dblink.out |  95 +++++++
 contrib/dblink/sql/dblink.sql      |  20 ++
 doc/src/sgml/dblink.sgml           |  37 +++
 5 files changed, 510 insertions(+), 209 deletions(-)

diff --git a/contrib/dblink/dblink--1.2.sql b/contrib/dblink/dblink--1.2.sql
index 405eccb0ff..1bb01a679e 100644
--- a/contrib/dblink/dblink--1.2.sql
+++ b/contrib/dblink/dblink--1.2.sql
@@ -31,6 +31,16 @@ CREATE FUNCTION dblink_connect_u (text, text)
 REVOKE ALL ON FUNCTION dblink_connect_u (text) FROM public;
 REVOKE ALL ON FUNCTION dblink_connect_u (text, text) FROM public;
 
+CREATE FUNCTION dblink_connect_self ()
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_connect_self'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION dblink_connect_self (text)
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_connect_self'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
 CREATE FUNCTION dblink_disconnect ()
 RETURNS text
 AS 'MODULE_PATHNAME','dblink_disconnect'
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 05938d0aeb..10c132b1cf 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -51,6 +51,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "parser/scansup.h"
+#include "tcop/bgsession.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -65,6 +66,7 @@ PG_MODULE_MAGIC;
 typedef struct remoteConn
 {
 	PGconn	   *conn;			/* Hold the remote connection */
+	BackgroundSession *bgconn;
 	int			openCursorCount;	/* The number of open cursors */
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
 } remoteConn;
@@ -88,6 +90,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
 				  PGresult *res);
+static void materializeBgResult(FunctionCallInfo fcinfo, BackgroundSession *bgconn, BackgroundSessionResult *res);
 static void materializeQueryResult(FunctionCallInfo fcinfo,
 					   PGconn *conn,
 					   const char *conname,
@@ -110,7 +113,7 @@ static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumat
 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
 static char *generate_relation_name(Relation rel);
 static void dblink_connstr_check(const char *connstr);
-static void dblink_security_check(PGconn *conn, remoteConn *rconn);
+static void dblink_security_check(remoteConn *rconn);
 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 							 const char *dblink_context_msg, bool fail);
 static char *get_connect_string(const char *servername);
@@ -175,22 +178,21 @@ dblink_conn_not_avail(const char *conname)
 
 static void
 dblink_get_conn(char *conname_or_str,
-				PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
+				remoteConn * volatile *rconn_p, char **conname_p, volatile bool *freeconn_p)
 {
 	remoteConn *rconn = getConnectionByName(conname_or_str);
-	PGconn	   *conn;
 	char	   *conname;
 	bool		freeconn;
 
 	if (rconn)
 	{
-		conn = rconn->conn;
 		conname = conname_or_str;
 		freeconn = false;
 	}
 	else
 	{
 		const char *connstr;
+		PGconn	   *conn;
 
 		connstr = get_connect_string(conname_or_str);
 		if (connstr == NULL)
@@ -206,38 +208,52 @@ dblink_get_conn(char *conname_or_str,
 					 errmsg("could not establish connection"),
 					 errdetail_internal("%s", msg)));
 		}
-		dblink_security_check(conn, rconn);
 		if (PQclientEncoding(conn) != GetDatabaseEncoding())
 			PQsetClientEncoding(conn, GetDatabaseEncodingName());
+		rconn = palloc0(sizeof(*rconn));
+		rconn->conn = conn;
 		freeconn = true;
 		conname = NULL;
+		dblink_security_check(rconn);
 	}
 
-	*conn_p = conn;
+	*rconn_p = rconn;
 	*conname_p = conname;
 	*freeconn_p = freeconn;
 }
 
-static PGconn *
+static void
+dblink_finish_conn(remoteConn *rconn)
+{
+	if (rconn->conn)
+	{
+		PQfinish(rconn->conn);
+		rconn->conn = NULL;
+	}
+	if (rconn->bgconn)
+	{
+		BackgroundSessionEnd(rconn->bgconn);
+		rconn->bgconn = NULL;
+	}
+}
+
+static remoteConn *
 dblink_get_named_conn(const char *conname)
 {
 	remoteConn *rconn = getConnectionByName(conname);
 	if (rconn)
-		return rconn->conn;
+		return rconn;
 	else
 		dblink_conn_not_avail(conname);
 }
 
 static void
-dblink_init(void)
+dblink_bgsession_not_supported(const remoteConn *rconn)
 {
-	if (!pconn)
-	{
-		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
-		pconn->conn = NULL;
-		pconn->openCursorCount = 0;
-		pconn->newXactForCursor = FALSE;
-	}
+	if (!rconn->conn)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("not supported with background session connection")));
 }
 
 /*
@@ -254,8 +270,6 @@ dblink_connect(PG_FUNCTION_ARGS)
 	PGconn	   *conn = NULL;
 	remoteConn *rconn = NULL;
 
-	dblink_init();
-
 	if (PG_NARGS() == 2)
 	{
 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
@@ -264,8 +278,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 	else if (PG_NARGS() == 1)
 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
-	if (connname)
-		rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
+	rconn = (remoteConn *) MemoryContextAllocZero(TopMemoryContext,
 												  sizeof(remoteConn));
 
 	/* first check for valid foreign data server */
@@ -291,19 +304,46 @@ dblink_connect(PG_FUNCTION_ARGS)
 	}
 
 	/* check password actually used if not superuser */
-	dblink_security_check(conn, rconn);
+	dblink_security_check(rconn);
 
 	/* attempt to set client encoding to match server encoding, if needed */
 	if (PQclientEncoding(conn) != GetDatabaseEncoding())
 		PQsetClientEncoding(conn, GetDatabaseEncodingName());
 
+	rconn->conn = conn;
+
 	if (connname)
-	{
-		rconn->conn = conn;
 		createNewConnection(connname, rconn);
-	}
 	else
-		pconn->conn = conn;
+		pconn = rconn;
+
+	PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
+
+PG_FUNCTION_INFO_V1(dblink_connect_self);
+Datum
+dblink_connect_self(PG_FUNCTION_ARGS)
+{
+	char	   *connname = NULL;
+	BackgroundSession *conn = NULL;
+	remoteConn *rconn = NULL;
+	MemoryContext oldcontext;
+
+	if (PG_NARGS() == 1)
+		connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	conn = BackgroundSessionStart();
+	MemoryContextSwitchTo(oldcontext);
+
+	rconn = (remoteConn *) MemoryContextAllocZero(TopMemoryContext,
+												  sizeof(remoteConn));
+	rconn->bgconn = conn;
+
+	if (connname)
+		createNewConnection(connname, rconn);
+	else
+		pconn = rconn;
 
 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
 }
@@ -317,31 +357,25 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 {
 	char	   *conname = NULL;
 	remoteConn *rconn = NULL;
-	PGconn	   *conn = NULL;
-
-	dblink_init();
 
 	if (PG_NARGS() == 1)
 	{
 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 		rconn = getConnectionByName(conname);
-		if (rconn)
-			conn = rconn->conn;
 	}
 	else
-		conn = pconn->conn;
+		rconn = pconn;
 
-	if (!conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
 
-	PQfinish(conn);
-	if (rconn)
-	{
+	dblink_finish_conn(rconn);
+	pfree(rconn);
+
+	if (conname)
 		deleteConnection(conname);
-		pfree(rconn);
-	}
 	else
-		pconn->conn = NULL;
+		pconn = NULL;
 
 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
 }
@@ -362,7 +396,6 @@ dblink_open(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 2)
@@ -400,10 +433,12 @@ dblink_open(PG_FUNCTION_ARGS)
 		rconn = getConnectionByName(conname);
 	}
 
-	if (!rconn || !rconn->conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
-	else
-		conn = rconn->conn;
+
+	dblink_bgsession_not_supported(rconn);
+
+	conn = rconn->conn;
 
 	/* If we are not in a transaction, start one */
 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
@@ -453,7 +488,6 @@ dblink_close(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 1)
@@ -487,10 +521,12 @@ dblink_close(PG_FUNCTION_ARGS)
 		rconn = getConnectionByName(conname);
 	}
 
-	if (!rconn || !rconn->conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
-	else
-		conn = rconn->conn;
+
+	dblink_bgsession_not_supported(rconn);
+
+	conn = rconn->conn;
 
 	appendStringInfo(&buf, "CLOSE %s", curname);
 
@@ -542,8 +578,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	dblink_init();
-
 	if (PG_NARGS() == 4)
 	{
 		/* text,text,int,bool */
@@ -553,8 +587,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 		fail = PG_GETARG_BOOL(3);
 
 		rconn = getConnectionByName(conname);
-		if (rconn)
-			conn = rconn->conn;
 	}
 	else if (PG_NARGS() == 3)
 	{
@@ -564,7 +596,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 			howmany = PG_GETARG_INT32(1);
 			fail = PG_GETARG_BOOL(2);
-			conn = pconn->conn;
+			rconn = pconn;
 		}
 		else
 		{
@@ -573,8 +605,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 			howmany = PG_GETARG_INT32(2);
 
 			rconn = getConnectionByName(conname);
-			if (rconn)
-				conn = rconn->conn;
 		}
 	}
 	else if (PG_NARGS() == 2)
@@ -582,12 +612,16 @@ dblink_fetch(PG_FUNCTION_ARGS)
 		/* text,int */
 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 		howmany = PG_GETARG_INT32(1);
-		conn = pconn->conn;
+		rconn = pconn;
 	}
 
-	if (!conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
 
+	dblink_bgsession_not_supported(rconn);
+
+	conn = rconn->conn;
+
 	initStringInfo(&buf);
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
@@ -632,23 +666,30 @@ PG_FUNCTION_INFO_V1(dblink_send_query);
 Datum
 dblink_send_query(PG_FUNCTION_ARGS)
 {
-	PGconn	   *conn;
+	remoteConn *rconn;
 	char	   *sql;
 	int			retval;
 
 	if (PG_NARGS() == 2)
 	{
-		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+		rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 	}
 	else
 		/* shouldn't happen */
 		elog(ERROR, "wrong number of arguments");
 
-	/* async query send */
-	retval = PQsendQuery(conn, sql);
-	if (retval != 1)
-		elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
+	if (rconn->conn)
+	{
+		retval = PQsendQuery(rconn->conn, sql);
+		if (retval != 1)
+			elog(NOTICE, "could not send query: %s", PQerrorMessage(rconn->conn));
+	}
+	else
+	{
+		BackgroundSessionSend(rconn->bgconn, sql);
+		retval = 1;
+	}
 
 	PG_RETURN_INT32(retval);
 }
@@ -663,13 +704,11 @@ dblink_get_result(PG_FUNCTION_ARGS)
 static Datum
 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 {
-	PGconn	   *volatile conn = NULL;
+	remoteConn *volatile rconn = NULL;
 	volatile bool freeconn = false;
 
 	prepTuplestoreResult(fcinfo);
 
-	dblink_init();
-
 	PG_TRY();
 	{
 		char	   *sql = NULL;
@@ -681,7 +720,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 3)
 			{
 				/* text,text,bool */
-				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				fail = PG_GETARG_BOOL(2);
 			}
@@ -690,20 +729,20 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 				/* text,text or text,bool */
 				if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 				{
-					conn = pconn->conn;
+					rconn = pconn;
 					sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 					fail = PG_GETARG_BOOL(1);
 				}
 				else
 				{
-					dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+					dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				}
 			}
 			else if (PG_NARGS() == 1)
 			{
 				/* text */
-				conn = pconn->conn;
+				rconn = pconn;
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 			}
 			else
@@ -716,61 +755,75 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 2)
 			{
 				/* text,bool */
-				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+				rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 				fail = PG_GETARG_BOOL(1);
 			}
 			else if (PG_NARGS() == 1)
 			{
 				/* text */
-				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+				rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 			}
 			else
 				/* shouldn't happen */
 				elog(ERROR, "wrong number of arguments");
 		}
 
-		if (!conn)
+		if (!rconn)
 			dblink_conn_not_avail(conname);
 
 		if (!is_async)
 		{
-			/* synchronous query, use efficient tuple collection method */
-			materializeQueryResult(fcinfo, conn, conname, sql, fail);
+			if (rconn->conn)
+				/* synchronous query, use efficient tuple collection method */
+				materializeQueryResult(fcinfo, rconn->conn, conname, sql, fail);
+			else if (rconn->bgconn)
+			{
+				BackgroundSessionResult *res = BackgroundSessionExecute(rconn->bgconn, sql);
+				materializeBgResult(fcinfo, rconn->bgconn, res);
+			}
 		}
 		else
 		{
 			/* async result retrieval, do it the old way */
-			PGresult   *res = PQgetResult(conn);
-
-			/* NULL means we're all done with the async results */
-			if (res)
+			if (rconn->conn)
 			{
-				if (PQresultStatus(res) != PGRES_COMMAND_OK &&
-					PQresultStatus(res) != PGRES_TUPLES_OK)
-				{
-					dblink_res_error(conn, conname, res,
-									 "could not execute query", fail);
-					/* if fail isn't set, we'll return an empty query result */
-				}
-				else
+				PGresult   *res = PQgetResult(rconn->conn);
+
+				/* NULL means we're all done with the async results */
+				if (res)
 				{
-					materializeResult(fcinfo, conn, res);
+					if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+						PQresultStatus(res) != PGRES_TUPLES_OK)
+					{
+						dblink_res_error(rconn->conn, conname, res,
+										 "could not execute query", fail);
+						/* if fail isn't set, we'll return an empty query result */
+					}
+					else
+					{
+						materializeResult(fcinfo, rconn->conn, res);
+					}
 				}
 			}
+			else
+			{
+				BackgroundSessionResult *res = BackgroundSessionGetResult(rconn->bgconn);
+				materializeBgResult(fcinfo, rconn->bgconn, res);
+			}
 		}
 	}
 	PG_CATCH();
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-			PQfinish(conn);
+			dblink_finish_conn(rconn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
 	/* if needed, close the connection to the database */
 	if (freeconn)
-		PQfinish(conn);
+		dblink_finish_conn(rconn);
 
 	return (Datum) 0;
 }
@@ -805,6 +858,45 @@ prepTuplestoreResult(FunctionCallInfo fcinfo)
 }
 
 /*
+ * get a tuple descriptor for our result type
+ */
+static TupleDesc
+dblink_get_call_result_type(FunctionCallInfo fcinfo, int nfields)
+{
+	TupleDesc	tupdesc;
+
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+	{
+		case TYPEFUNC_COMPOSITE:
+			/* success */
+			break;
+		case TYPEFUNC_RECORD:
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+			break;
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
+	}
+
+	/*
+	 * check result and tuple descriptor have the same number of columns
+	 */
+	if (nfields != tupdesc->natts)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATATYPE_MISMATCH),
+				 errmsg("remote query result rowtype does not match "
+						"the specified FROM clause rowtype")));
+
+	/* make sure we have a persistent copy of the tupdesc */
+	return CreateTupleDescCopy(tupdesc);
+}
+
+/*
  * Copy the contents of the PGresult into a tuplestore to be returned
  * as the result of the current function.
  * The PGresult will be released in this function.
@@ -843,41 +935,11 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
 
 			is_sql_cmd = false;
-
-			/* get a tuple descriptor for our result type */
-			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-			{
-				case TYPEFUNC_COMPOSITE:
-					/* success */
-					break;
-				case TYPEFUNC_RECORD:
-					/* failed to determine actual type of RECORD */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						errmsg("function returning record called in context "
-							   "that cannot accept type record")));
-					break;
-				default:
-					/* result type isn't composite */
-					elog(ERROR, "return type must be a row type");
-					break;
-			}
-
-			/* make sure we have a persistent copy of the tupdesc */
-			tupdesc = CreateTupleDescCopy(tupdesc);
 			ntuples = PQntuples(res);
 			nfields = PQnfields(res);
+			tupdesc = dblink_get_call_result_type(fcinfo, nfields);
 		}
 
-		/*
-		 * check result and tuple descriptor have the same number of columns
-		 */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
-
 		if (ntuples > 0)
 		{
 			AttInMetadata *attinmeta;
@@ -947,6 +1009,95 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
 	PG_END_TRY();
 }
 
+static void
+materializeBgResult(FunctionCallInfo fcinfo, BackgroundSession *bgconn, BackgroundSessionResult *res)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	bool		is_sql_cmd;
+	int			nfields;
+	AttInMetadata *attinmeta;
+	HeapTuple	outtuple;
+	Tuplestorestate *tupstore;
+	MemoryContext oldcontext;
+	char	  **values;
+
+	/* prepTuplestoreResult must have been called previously */
+	Assert(rsinfo->returnMode == SFRM_Materialize);
+
+	if (!res->tupdesc)
+	{
+		is_sql_cmd = true;
+
+		/*
+		 * need a tuple descriptor representing one TEXT column to return
+		 * the command status string as our result tuple
+		 */
+		tupdesc = CreateTemplateTupleDesc(1, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+						   TEXTOID, -1, 0);
+		nfields = 1;
+	}
+	else
+	{
+		is_sql_cmd = false;
+		nfields = res->tupdesc->natts;
+		tupdesc = dblink_get_call_result_type(fcinfo, nfields);
+	}
+
+	attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+	oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+	MemoryContextSwitchTo(oldcontext);
+
+	values = (char **) palloc(nfields * sizeof(char *));
+
+	if (is_sql_cmd)
+	{
+		values[0] = (char *) res->command;
+		outtuple = BuildTupleFromCStrings(attinmeta, values);
+		tuplestore_puttuple(tupstore, outtuple);
+	}
+	else
+	{
+		ListCell   *lc;
+
+		foreach (lc, res->tuples)
+		{
+			HeapTuple intuple = (HeapTuple) lfirst(lc);
+			int			i;
+
+			for (i = 0; i < nfields; i++)
+			{
+				bool		isnull;
+				Datum		val;
+
+				val = heap_getattr(intuple, i+1, res->tupdesc, &isnull);
+				if (isnull)
+					values[i] = NULL;
+				else
+				{
+					Oid		typOutput;
+					bool	typIsVarlena;
+
+					getTypeOutputInfo(res->tupdesc->attrs[i]->atttypid, &typOutput, &typIsVarlena);
+					values[i] = DatumGetCString(OidFunctionCall1(typOutput, val));
+				}
+			}
+
+			outtuple = BuildTupleFromCStrings(attinmeta, values);
+			tuplestore_puttuple(tupstore, outtuple);
+		}
+	}
+
+	/* clean up and return the tuplestore */
+	tuplestore_donestoring(tupstore);
+}
+
 /*
  * Execute the given SQL command and store its results into a tuplestore
  * to be returned as the result of the current function.
@@ -1164,34 +1315,7 @@ storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
 			tuplestore_end(sinfo->tuplestore);
 		sinfo->tuplestore = NULL;
 
-		/* get a tuple descriptor for our result type */
-		switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
-		{
-			case TYPEFUNC_COMPOSITE:
-				/* success */
-				break;
-			case TYPEFUNC_RECORD:
-				/* failed to determine actual type of RECORD */
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						 errmsg("function returning record called in context "
-								"that cannot accept type record")));
-				break;
-			default:
-				/* result type isn't composite */
-				elog(ERROR, "return type must be a row type");
-				break;
-		}
-
-		/* make sure we have a persistent copy of the tupdesc */
-		tupdesc = CreateTupleDescCopy(tupdesc);
-
-		/* check result and tuple descriptor have the same number of columns */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
+		tupdesc = dblink_get_call_result_type(sinfo->fcinfo, nfields);
 
 		/* Prepare attinmeta for later data conversions */
 		sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
@@ -1291,13 +1415,14 @@ PG_FUNCTION_INFO_V1(dblink_is_busy);
 Datum
 dblink_is_busy(PG_FUNCTION_ARGS)
 {
-	PGconn	   *conn;
+	remoteConn	   *rconn;
 
-	dblink_init();
-	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 
-	PQconsumeInput(conn);
-	PG_RETURN_INT32(PQisBusy(conn));
+	dblink_bgsession_not_supported(rconn);
+
+	PQconsumeInput(rconn->conn);
+	PG_RETURN_INT32(PQisBusy(rconn->conn));
 }
 
 /*
@@ -1316,13 +1441,13 @@ Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
 	int			res;
-	PGconn	   *conn;
+	remoteConn *rconn;
 	PGcancel   *cancel;
 	char		errbuf[256];
 
-	dblink_init();
-	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancel = PQgetCancel(conn);
+	rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	dblink_bgsession_not_supported(rconn);
+	cancel = PQgetCancel(rconn->conn);
 
 	res = PQcancel(cancel, errbuf, 256);
 	PQfreeCancel(cancel);
@@ -1349,12 +1474,12 @@ Datum
 dblink_error_message(PG_FUNCTION_ARGS)
 {
 	char	   *msg;
-	PGconn	   *conn;
+	remoteConn *rconn;
 
-	dblink_init();
-	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	dblink_bgsession_not_supported(rconn);
 
-	msg = PQerrorMessage(conn);
+	msg = PQerrorMessage(rconn->conn);
 	if (msg == NULL || msg[0] == '\0')
 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
 	else
@@ -1369,14 +1494,11 @@ Datum
 dblink_exec(PG_FUNCTION_ARGS)
 {
 	text	   *volatile sql_cmd_status = NULL;
-	PGconn	   *volatile conn = NULL;
+	remoteConn *volatile rconn = NULL;
 	volatile bool freeconn = false;
 
-	dblink_init();
-
 	PG_TRY();
 	{
-		PGresult   *res = NULL;
 		char	   *sql = NULL;
 		char	   *conname = NULL;
 		bool		fail = true;	/* default to backward compatible behavior */
@@ -1384,7 +1506,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 		if (PG_NARGS() == 3)
 		{
 			/* must be text,text,bool */
-			dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+			dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			fail = PG_GETARG_BOOL(2);
 		}
@@ -1393,72 +1515,89 @@ dblink_exec(PG_FUNCTION_ARGS)
 			/* might be text,text or text,bool */
 			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 			{
-				conn = pconn->conn;
+				rconn = pconn;
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 				fail = PG_GETARG_BOOL(1);
 			}
 			else
 			{
-				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			}
 		}
 		else if (PG_NARGS() == 1)
 		{
 			/* must be single text argument */
-			conn = pconn->conn;
+			rconn = pconn;
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 		}
 		else
 			/* shouldn't happen */
 			elog(ERROR, "wrong number of arguments");
 
-		if (!conn)
+		if (!rconn)
 			dblink_conn_not_avail(conname);
 
-		res = PQexec(conn, sql);
-		if (!res ||
-			(PQresultStatus(res) != PGRES_COMMAND_OK &&
-			 PQresultStatus(res) != PGRES_TUPLES_OK))
+		if (rconn->conn)
 		{
-			dblink_res_error(conn, conname, res,
-							 "could not execute command", fail);
+			PGresult   *res;
 
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = cstring_to_text("ERROR");
-		}
-		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = cstring_to_text(PQcmdStatus(res));
-			PQclear(res);
+			res = PQexec(rconn->conn, sql);
+
+			if (!res ||
+				(PQresultStatus(res) != PGRES_COMMAND_OK &&
+				 PQresultStatus(res) != PGRES_TUPLES_OK))
+			{
+				dblink_res_error(rconn->conn, conname, res,
+								 "could not execute command", fail);
+
+				/*
+				 * and save a copy of the command status string to return as our
+				 * result tuple
+				 */
+				sql_cmd_status = cstring_to_text("ERROR");
+			}
+			else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+			{
+				/*
+				 * and save a copy of the command status string to return as our
+				 * result tuple
+				 */
+				sql_cmd_status = cstring_to_text(PQcmdStatus(res));
+				PQclear(res);
+			}
+			else
+			{
+				PQclear(res);
+				ereport(ERROR,
+						(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+						 errmsg("statement returning results not allowed")));
+			}
 		}
 		else
 		{
-			PQclear(res);
-			ereport(ERROR,
-				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-				   errmsg("statement returning results not allowed")));
+			BackgroundSessionResult *res;
+
+			res = BackgroundSessionExecute(rconn->bgconn, sql);
+			if (res->tupdesc)
+				ereport(ERROR,
+						(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+						 errmsg("statement returning results not allowed")));
+			sql_cmd_status = cstring_to_text(res->command);
 		}
 	}
 	PG_CATCH();
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-			PQfinish(conn);
+			dblink_finish_conn(rconn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
 	/* if needed, close the connection to the database */
 	if (freeconn)
-		PQfinish(conn);
+		dblink_finish_conn(rconn);
 
 	PG_RETURN_TEXT_P(sql_cmd_status);
 }
@@ -1865,7 +2004,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify);
 Datum
 dblink_get_notify(PG_FUNCTION_ARGS)
 {
-	PGconn	   *conn;
+	remoteConn *rconn;
 	PGnotify   *notify;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
@@ -1875,11 +2014,12 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	dblink_init();
 	if (PG_NARGS() == 1)
-		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+		rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 	else
-		conn = pconn->conn;
+		rconn = pconn;
+
+	dblink_bgsession_not_supported(rconn);
 
 	/* create the tuplestore in per-query memory */
 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
@@ -1899,8 +2039,8 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 
 	MemoryContextSwitchTo(oldcontext);
 
-	PQconsumeInput(conn);
-	while ((notify = PQnotifies(conn)) != NULL)
+	PQconsumeInput(rconn->conn);
+	while ((notify = PQnotifies(rconn->conn)) != NULL)
 	{
 		Datum		values[DBLINK_NOTIFY_COLS];
 		bool		nulls[DBLINK_NOTIFY_COLS];
@@ -1923,7 +2063,7 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 
 		PQfreemem(notify);
-		PQconsumeInput(conn);
+		PQconsumeInput(rconn->conn);
 	}
 
 	/* clean up and return the tuplestore */
@@ -2556,7 +2696,7 @@ createNewConnection(const char *name, remoteConn *rconn)
 
 	if (found)
 	{
-		PQfinish(rconn->conn);
+		dblink_finish_conn(rconn);
 		pfree(rconn);
 
 		ereport(ERROR,
@@ -2591,15 +2731,14 @@ deleteConnection(const char *name)
 }
 
 static void
-dblink_security_check(PGconn *conn, remoteConn *rconn)
+dblink_security_check(remoteConn *rconn)
 {
-	if (!superuser())
+	if (!superuser() && rconn->conn)
 	{
-		if (!PQconnectionUsedPassword(conn))
+		if (!PQconnectionUsedPassword(rconn->conn))
 		{
-			PQfinish(conn);
-			if (rconn)
-				pfree(rconn);
+			PQfinish(rconn->conn);
+			pfree(rconn);
 
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 5acaaf225a..b2369b2a5f 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -1126,3 +1126,98 @@ SELECT dblink_disconnect('myconn');
 RESET datestyle;
 RESET intervalstyle;
 RESET timezone;
+-- background sessions
+SELECT * FROM foo;
+ f1 | f2 |      f3       
+----+----+---------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+  3 | d  | {a3,b3,c3}
+  4 | e  | {a4,b4,c4}
+  5 | f  | {a5,b5,c5}
+  6 | g  | {a6,b6,c6}
+  7 | h  | {a7,b7,c7}
+  8 | i  | {a8,b8,c8}
+  9 | j  | {a9,b9,c9}
+ 10 | k  | {a10,b10,c10}
+(11 rows)
+
+SELECT dblink_connect_self();
+ dblink_connect_self 
+---------------------
+ OK
+(1 row)
+
+SELECT dblink_exec('SELECT * FROM foo');
+ERROR:  statement returning results not allowed
+SELECT dblink_exec('DELETE FROM foo WHERE f1 > 5');
+ dblink_exec 
+-------------
+ DELETE 5
+(1 row)
+
+SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]);
+ a | b |     c      
+---+---+------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+ 3 | d | {a3,b3,c3}
+ 4 | e | {a4,b4,c4}
+ 5 | f | {a5,b5,c5}
+(6 rows)
+
+SELECT * FROM dblink('DELETE FROM foo WHERE f1 > 5') AS (status text);
+  status  
+----------
+ DELETE 0
+(1 row)
+
+SELECT dblink_disconnect();
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
+SELECT * FROM foo;
+ f1 | f2 |     f3     
+----+----+------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+  3 | d  | {a3,b3,c3}
+  4 | e  | {a4,b4,c4}
+  5 | f  | {a5,b5,c5}
+(6 rows)
+
+-- asynchronous case
+SELECT dblink_connect_self('myconn');
+ dblink_connect_self 
+---------------------
+ OK
+(1 row)
+
+SELECT *
+FROM dblink_send_query('myconn',
+    'SELECT * FROM
+     (VALUES (''12.03.2013 00:00:00+00''),
+             (''12.03.2013 00:00:00+00'')) t');
+ dblink_send_query 
+-------------------
+                 1
+(1 row)
+
+SELECT * from dblink_get_result('myconn') as t(t timestamptz);
+              t               
+------------------------------
+ Mon Dec 02 16:00:00 2013 PST
+ Mon Dec 02 16:00:00 2013 PST
+(2 rows)
+
+SELECT dblink_disconnect('myconn');
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index 681cf6a6e8..7f66a98aa5 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -563,3 +563,23 @@ CREATE TEMPORARY TABLE result (t timestamptz);
 RESET datestyle;
 RESET intervalstyle;
 RESET timezone;
+
+-- background sessions
+SELECT * FROM foo;
+SELECT dblink_connect_self();
+SELECT dblink_exec('SELECT * FROM foo');
+SELECT dblink_exec('DELETE FROM foo WHERE f1 > 5');
+SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]);
+SELECT * FROM dblink('DELETE FROM foo WHERE f1 > 5') AS (status text);
+SELECT dblink_disconnect();
+SELECT * FROM foo;
+
+-- asynchronous case
+SELECT dblink_connect_self('myconn');
+SELECT *
+FROM dblink_send_query('myconn',
+    'SELECT * FROM
+     (VALUES (''12.03.2013 00:00:00+00''),
+             (''12.03.2013 00:00:00+00'')) t');
+SELECT * from dblink_get_result('myconn') as t(t timestamptz);
+SELECT dblink_disconnect('myconn');
diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml
index f19c6b19f5..4b070ddb23 100644
--- a/doc/src/sgml/dblink.sgml
+++ b/doc/src/sgml/dblink.sgml
@@ -239,6 +239,43 @@ <title>Description</title>
   </refsect1>
  </refentry>
 
+ <refentry id="CONTRIB-DBLINK-CONNECT-SELF">
+  <indexterm>
+   <primary>dblink_connect_self</primary>
+  </indexterm>
+
+  <refmeta>
+   <refentrytitle>dblink_connect_self</refentrytitle>
+   <manvolnum>3</manvolnum>
+  </refmeta>
+
+  <refnamediv>
+   <refname>dblink_connect_self</refname>
+   <refpurpose>opens a persistent connection to the same database</refpurpose>
+  </refnamediv>
+
+  <refsynopsisdiv>
+<synopsis>
+dblink_connect_self() returns text
+dblink_connect_self(text connname) returns text
+</synopsis>
+  </refsynopsisdiv>
+
+  <refsect1>
+   <title>Description</title>
+
+   <para>
+    <function>dblink_connect_self()</> opens a connection to the current
+    database and as the current user, using the background session API
+    (<xref linkend="bgsession">).
+   </para>
+
+   <para>
+    For further details see <function>dblink_connect()</>.
+   </para>
+  </refsect1>
+ </refentry>
+
  <refentry id="CONTRIB-DBLINK-DISCONNECT">
   <indexterm>
    <primary>dblink_disconnect</primary>
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to