I revised the patch so that async scan will be done more
aggressively, and took execution time for two very simple cases.

As the result, simple seq scan gained 5% and hash join of two
foreign tables gained 150%. (2.4 times faster).

While measuring the performance, I noticed that each scan in a
query runs at once rather than alternating with each other in
many cases such as hash join or sorted joins and so. So I
modified the patch so that async fetch is done more
aggressively. The new v4 patch is attached. The following numbers
are taken based on it.

========
Simple seq scan for the first test.

> CREATE TABLE lt1 (a int, b timestamp, c text);
> CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 
> 'localhost');
> CREATE USER MAPPING FOR PUBLIC SERVER sv1;
> CREATE FOREIGN TABLE ft1 () SERVER sv1 OPTIONS (table_name 'lt1');
> INSERT INTO lt1 (SELECT a, now(), repeat('x', 128) FROM generate_series(0, 
> 999999) a);

On this case, I took the the 10 times average of exec time of the
following query for both master head and patched version.  The
fetch size is 100.

> postgres=# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT * FROM ft1;
>                          QUERY PLAN                  
> ------------------------------------------------------------------
>  Foreign Scan on ft1  (actual time=0.79 5..4175.706 rows=1000000 loops=1)
>  Planning time: 0.060 ms
>  Execution time: 4276.043 ms

  master head  : avg = 4256.621,  std dev = 17.099
  patched pgfdw: avg = 4036.463,  std dev =  2.608

The patched version is faster by about 5%. This should be pure
result of asynchronous fetching, not including the effect of
early starting of remote execution in ExecInit.

Interestingly, as fetch_count gets larger, the gain raises in
spite of the decrease of the number of query sending.

  master head  : avg = 2622.759,  std dev = 38.379
  patched pgfdw: avg = 2277.622,  std dev = 27.269

About 15% gain. And for 10000,

  master head  : avg = 2000.980,  std dev =  6.434
  patched pgfdw: avg = 1616.793,  std dev = 13.192

19%.. It is natural that exec time reduces along with increase of
fetch size, but I haven't found the reason why the patch's gain
also increases.

======================

The second case is a simple join of two foreign tables sharing
one connection.

The master head runs this query in about 16 seconds with almost
no fluctuation among multiple tries.

> =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c
>    FROM ft1 AS x JOIN ft1 AS y on x.a = y.a;
>                                       QUERY PLAN
> ----------------------------------------------------------------------------
>  Hash Join (actual time=7541.831..15924.631 rows=1000000 loops=1)
>    Hash Cond: (x.a = y.a)
>   ->  Foreign Scan on ft1 x (actual time=1.176..6553.480 rows=1000000 loops=1)
>   ->  Hash (actual time=7539.761..7539.761 rows=1000000 loops=1)
>        Buckets: 32768  Batches: 64  Memory Usage: 2829kB
>    ->  Foreign Scan on ft1 y (actual time=1.067..6529.165 rows=1000000 
> loops=1)
>  Planning time: 0.223 ms
>  Execution time: 15973.916 ms

But the v4 patch mysteriously accelerates this query, 6.5 seconds.

> =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c
>    FROM ft1 AS x JOIN ft1 AS y on x.a = y.a;
>                                    QUERY PLAN
> ----------------------------------------------------------------------------
>  Hash Join (actual time=2556.977..5812.937 rows=1000000 loops=1)
>    Hash Cond: (x.a = y.a)
>   ->  Foreign Scan on ft1 x (actual time=32.689..1936.565 rows=1000000 
> loops=1)
>   ->  Hash (actual time=2523.810..2523.810 rows=1000000 loops=1)
>        Buckets: 32768  Batches: 64  Memory Usage: 2829kB
>    ->  Foreign Scan on ft1 y (actual time=50.345..1928.411 rows=1000000 
> loops=1)
>  Planning time: 0.220 ms
>  Execution time: 6512.043 ms

The result data seems not broken. I don't know the reason yet but
I'll investigate it.


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From edba0530fb6a9c5a4e6def055757d6d60bce9171 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 13 Jan 2015 19:20:35 +0900
Subject: [PATCH] Asynchronous execution of postgres_fdw v4

This is the modified version of Asynchronous execution of
postgres_fdw.

- Do async fetch more aggressively than v3.

- No additional tests yet :(
---
 contrib/postgres_fdw/Makefile       |   2 +-
 contrib/postgres_fdw/PgFdwConn.c    | 200 +++++++++++++++++++++++++
 contrib/postgres_fdw/PgFdwConn.h    |  61 ++++++++
 contrib/postgres_fdw/connection.c   |  82 ++++++-----
 contrib/postgres_fdw/postgres_fdw.c | 283 +++++++++++++++++++++++++++---------
 contrib/postgres_fdw/postgres_fdw.h |  15 +-
 6 files changed, 527 insertions(+), 116 deletions(-)
 create mode 100644 contrib/postgres_fdw/PgFdwConn.c
 create mode 100644 contrib/postgres_fdw/PgFdwConn.h

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index d2b98e1..d0913e2 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -1,7 +1,7 @@
 # contrib/postgres_fdw/Makefile
 
 MODULE_big = postgres_fdw
-OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES)
+OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES)
 PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL"
 
 PG_CPPFLAGS = -I$(libpq_srcdir)
diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c
new file mode 100644
index 0000000..b13b597
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.c
@@ -0,0 +1,200 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.c
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "PgFdwConn.h"
+
+#define PFC_ALLOCATE()	((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c)		free(c)
+
+struct pgfdw_conn
+{
+	PGconn *pgconn;				/* libpq connection for this connection */
+	int		nscans;				/* number of scans using this connection */
+	struct PgFdwScanState *async_scan; /* the connection currently running
+										* async query on this connection  */
+};
+
+void
+PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan)
+{
+	conn->async_scan = scan;
+}
+
+struct PgFdwScanState *
+PFCgetAsyncScan(PgFdwConn *conn)
+{
+	return conn->async_scan;
+}
+
+int
+PFCisAsyncRunning(PgFdwConn *conn)
+{
+	return conn->async_scan != NULL;
+}
+
+PGconn *
+PFCgetPGconn(PgFdwConn *conn)
+{
+	return conn->pgconn;
+}
+
+int
+PFCgetNscans(PgFdwConn *conn)
+{
+	return conn->nscans;
+}
+
+int
+PFCincrementNscans(PgFdwConn *conn)
+{
+	return ++conn->nscans;
+}
+
+int
+PFCdecrementNscans(PgFdwConn *conn)
+{
+	Assert(conn->nscans > 0);
+	return --conn->nscans;
+}
+
+void
+PFCcancelAsync(PgFdwConn *conn)
+{
+	if (PFCisAsyncRunning(conn))
+		PFCconsumeInput(conn);
+}
+
+void
+PFCinit(PgFdwConn *conn)
+{
+	conn->async_scan = NULL;
+	conn->nscans = 0;
+}
+
+int
+PFCsendQuery(PgFdwConn *conn, const char *query)
+{
+	return PQsendQuery(conn->pgconn, query);
+}
+
+PGresult *
+PFCexec(PgFdwConn *conn, const char *query)
+{
+	return PQexec(conn->pgconn, query);
+}
+
+PGresult *
+PFCexecParams(PgFdwConn *conn,
+			  const char *command,
+			  int nParams,
+			  const Oid *paramTypes,
+			  const char *const * paramValues,
+			  const int *paramLengths,
+			  const int *paramFormats,
+			  int resultFormat)
+{
+	return PQexecParams(conn->pgconn,
+						command, nParams, paramTypes, paramValues,
+						paramLengths, paramFormats, resultFormat);
+}
+
+PGresult *
+PFCprepare(PgFdwConn *conn,
+		   const char *stmtName, const char *query,
+		   int nParams, const Oid *paramTypes)
+{
+	return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes);
+}
+
+PGresult *
+PFCexecPrepared(PgFdwConn *conn,
+				const char *stmtName,
+				int nParams,
+				const char *const * paramValues,
+				const int *paramLengths,
+				const int *paramFormats,
+				int resultFormat)
+{
+	return PQexecPrepared(conn->pgconn, 
+						  stmtName, nParams, paramValues, paramLengths,
+						  paramFormats, resultFormat);
+}
+
+PGresult *
+PFCgetResult(PgFdwConn *conn)
+{
+	return PQgetResult(conn->pgconn);
+}
+
+int
+PFCconsumeInput(PgFdwConn *conn)
+{
+	return PQconsumeInput(conn->pgconn);
+}
+
+int
+PFCisBusy(PgFdwConn *conn)
+{
+	return PQisBusy(conn->pgconn);
+}
+
+ConnStatusType
+PFCstatus(const PgFdwConn *conn)
+{
+	return PQstatus(conn->pgconn);
+}
+
+PGTransactionStatusType
+PFCtransactionStatus(const PgFdwConn *conn)
+{
+	return PQtransactionStatus(conn->pgconn);
+}
+
+int
+PFCserverVersion(const PgFdwConn *conn)
+{
+	return PQserverVersion(conn->pgconn);
+}
+
+char *
+PFCerrorMessage(const PgFdwConn *conn)
+{
+	return PQerrorMessage(conn->pgconn);
+}
+
+int
+PFCconnectionUsedPassword(const PgFdwConn *conn)
+{
+	return PQconnectionUsedPassword(conn->pgconn);
+}
+
+void
+PFCfinish(PgFdwConn *conn)
+{
+	return PQfinish(conn->pgconn);
+	PFC_FREE(conn);
+}
+
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+				   const char *const * values, int expand_dbname)
+{
+	PgFdwConn *ret = PFC_ALLOCATE();
+
+	PFCinit(ret);
+	ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname);
+
+	return ret;
+}
diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h
new file mode 100644
index 0000000..f695f5a
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.h
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.h
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGFDWCONN_H
+#define PGFDWCONN_H
+
+#include "libpq-fe.h"
+
+typedef struct pgfdw_conn PgFdwConn;
+struct PgFdwScanState;
+
+extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan);
+extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn);
+extern int PFCisAsyncRunning(PgFdwConn *conn);
+extern PGconn *PFCgetPGconn(PgFdwConn *conn);
+extern int PFCgetNscans(PgFdwConn *conn);
+extern int PFCincrementNscans(PgFdwConn *conn);
+extern int PFCdecrementNscans(PgFdwConn *conn);
+extern void PFCcancelAsync(PgFdwConn *conn);
+extern void PFCinit(PgFdwConn *conn);
+extern int PFCsendQuery(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexec(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexecParams(PgFdwConn *conn,
+								const char *command,
+								int nParams,
+								const Oid *paramTypes,
+								const char *const * paramValues,
+								const int *paramLengths,
+								const int *paramFormats,
+								int resultFormat);
+extern PGresult *PFCprepare(PgFdwConn *conn,
+							const char *stmtName, const char *query,
+							int nParams, const Oid *paramTypes);
+extern PGresult *PFCexecPrepared(PgFdwConn *conn,
+								 const char *stmtName,
+								 int nParams,
+								 const char *const * paramValues,
+								 const int *paramLengths,
+								 const int *paramFormats,
+								 int resultFormat);
+extern PGresult *PFCgetResult(PgFdwConn *conn);
+extern int PFCconsumeInput(PgFdwConn *conn);
+extern int PFCisBusy(PgFdwConn *conn);
+extern ConnStatusType PFCstatus(const PgFdwConn *conn);
+extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn);
+extern int PFCserverVersion(const PgFdwConn *conn);
+extern char *PFCerrorMessage(const PgFdwConn *conn);
+extern int PFCconnectionUsedPassword(const PgFdwConn *conn);
+extern void PFCfinish(PgFdwConn *conn);
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+			 const char *const * values, int expand_dbname);
+#endif   /* PGFDWCONN_H */
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..2517f6b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -161,9 +161,12 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 PFCgetPGconn(entry->conn), server->servername);
+
 	}
 
+	PFCincrementNscans(entry->conn);
+
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
@@ -178,10 +181,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
-static PGconn *
+static PgFdwConn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PgFdwConn   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +226,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify connection parameters and make connection */
 		check_conn_params(keywords, values);
 
-		conn = PQconnectdbParams(keywords, values, false);
-		if (!conn || PQstatus(conn) != CONNECTION_OK)
+		conn = PFCconnectdbParams(keywords, values, false);
+		if (!conn || PFCstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
-			connmessage = pstrdup(PQerrorMessage(conn));
+			connmessage = pstrdup(PFCerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
@@ -246,7 +249,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
-		if (!superuser() && !PQconnectionUsedPassword(conn))
+		if (!superuser() && !PFCconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 				   errmsg("password is required"),
@@ -263,7 +266,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	{
 		/* Release PGconn data structure if we managed to create one */
 		if (conn)
-			PQfinish(conn);
+			PFCfinish(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -312,9 +315,9 @@ check_conn_params(const char **keywords, const char **values)
  * there are any number of ways to break things.
  */
 static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
 {
-	int			remoteversion = PQserverVersion(conn);
+	int			remoteversion = PFCserverVersion(conn);
 
 	/* Force the search path to contain only pg_catalog (see deparse.c) */
 	do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +351,11 @@ configure_remote_session(PGconn *conn)
  * Convenience subroutine to issue a non-data-returning SQL command to remote
  */
 static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -379,7 +382,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 PFCgetPGconn(entry->conn));
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +411,11 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* ongoing async query should be canceled if no scans left */
+	if (PFCdecrementNscans(conn) == 0)
+		finish_async_query(conn);
 }
 
 /*
@@ -429,7 +430,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -443,7 +444,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -462,7 +463,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +491,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PFCerrorMessage(conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -542,7 +543,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 PFCgetPGconn(entry->conn));
 
 			switch (event)
 			{
@@ -567,7 +568,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PFCexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -597,7 +598,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PFCexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -608,7 +609,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PFCexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -620,17 +621,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
+		PFCcancelAsync(entry->conn);
+		PFCinit(entry->conn);
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PFCstatus(entry->conn) != CONNECTION_OK ||
+			PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p",
+				 PFCgetPGconn(entry->conn));
+			PFCfinish(entry->conn);
 		}
 	}
 
@@ -676,6 +679,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		PFCcancelAsync(entry->conn);
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -701,7 +707,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PFCexec(entry->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..1dfb221 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -123,6 +123,12 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateRetrievedAttrs
 };
 
+typedef enum fetch_mode {
+	START_ONLY,
+	FORCE_SYNC,
+	ALLOW_ASYNC
+} fetch_mode;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
@@ -136,7 +142,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -156,6 +162,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -167,7 +174,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -298,7 +305,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -306,9 +313,9 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
+static void fetch_more_data(PgFdwScanState *node, fetch_mode cmd);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -329,7 +336,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -982,6 +988,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	/*
+	 * Start scanning asynchronously if it is the first scan on this
+	 * connection.
+	 */
+	if (PFCgetNscans(fsstate->conn) == 1)
+		create_cursor(fsstate);
 }
 
 /*
@@ -1000,7 +1015,10 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+	{
+		finish_async_query(fsstate->conn);
+		create_cursor(fsstate);
+	}
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1009,7 +1027,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate, ALLOW_ASYNC);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1069,7 +1087,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PFCexec(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1392,19 +1410,22 @@ postgresExecForeignInsert(EState *estate,
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1462,19 +1483,22 @@ postgresExecForeignUpdate(EState *estate,
 										(ItemPointer) DatumGetPointer(datum),
 										slot);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1532,19 +1556,22 @@ postgresExecForeignDelete(EState *estate,
 										(ItemPointer) DatumGetPointer(datum),
 										NULL);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1594,7 +1621,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PFCexec(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1726,7 +1753,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1836,7 +1863,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1852,7 +1879,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PFCexec(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1917,13 +1944,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn	*conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -1985,8 +2011,8 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+						 NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2001,55 +2027,128 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/*
+	 * Start async scan if this is the first scan. See fetch_more_data() for
+	 * details
+	 */
+	if (PFCgetNscans(conn) == 1)
+		fetch_more_data(fsstate, START_ONLY);
 }
 
 /*
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch. Some tuples left unread when asynchronous fetching is
+	 * interrupted. Don't flush to preserve the unread tuples for the case. It
+	 * occurs no more than twice successively.
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		fsstate->tuples = NULL;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
-		int			numrows;
+		int			numrows, addrows, restrows;
+		HeapTuple  *tmptuples;
 		int			i;
 
 		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
+		fetch_size = 10000;
 
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		if (PFCisAsyncRunning(conn))
+		{
+			Assert (cmd != START_ONLY);
+
+			/*
+			 * If the target fsstate is different from the scan state that the
+			 * current async fetch running for, the result should be stored
+			 * into it, then synchronously fetch data for the target fsstate.
+			 */
+			if (fsstate != PFCgetAsyncScan(conn))
+			{
+				fetch_more_data(PFCgetAsyncScan(conn), FORCE_SYNC);
+				res = PFCexec(conn, sql);
+			}
+			else
+			{
+				/* Get result of running async fetch */
+				res = PFCgetResult(conn);
+				if (PQntuples(res) == fetch_size)
+				{
+					/*
+					 * Connection state doesn't go to IDLE even if all data
+					 * has been sent to client for asynchronous query. One
+					 * more PQgetResult() is needed to reset the state to
+					 * IDLE.  See PQexecFinish() for details.
+					 */
+					if (PFCgetResult(conn) != NULL)
+						elog(ERROR, "Connection status error.");
+				}
+			}
+			PFCsetAsyncScan(conn, NULL);
+		}
+		else
+		{
+			if (cmd == START_ONLY)
+			{
+				Assert(PFCgetNscans(conn) == 1);
+
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+
+				PFCsetAsyncScan(conn, fsstate);
+				goto end_of_fetch;
+			}
+
+			/* Elsewise do synchronous query execution */
+			PFCsetAsyncScan(conn, NULL);
+			res = PFCexec(conn, sql);
+		}
+
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
+		/* allocate tuple storage */
+		tmptuples = fsstate->tuples;
+		addrows = PQntuples(res);
+		restrows = fsstate->num_tuples - fsstate->next_tuple;
+		numrows = restrows + addrows;
 		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+
+		Assert(restrows == 0 || tmptuples);
+
+		/* copy unread tuples if any */
+		for (i = 0 ; i < restrows ; i++)
+			fsstate->tuples[i] = tmptuples[fsstate->next_tuple + i];
+
 		fsstate->num_tuples = numrows;
 		fsstate->next_tuple = 0;
 
-		for (i = 0; i < numrows; i++)
+		/* Convert the data into HeapTuples */
+		for (i = 0 ; i < addrows; i++)
 		{
-			fsstate->tuples[i] =
+			fsstate->tuples[restrows + i] =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
@@ -2066,6 +2165,23 @@ fetch_more_data(ForeignScanState *node)
 
 		PQclear(res);
 		res = NULL;
+
+		if (cmd == ALLOW_ASYNC)
+		{
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+				PFCsetAsyncScan(conn, fsstate);
+			}
+		}
+
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2079,6 +2195,28 @@ fetch_more_data(ForeignScanState *node)
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_query(PgFdwConn *conn)
+{
+	PgFdwScanState *fsstate = PFCgetAsyncScan(conn);
+	PgFdwConn *async_conn;
+
+	/* Nothing to do if no async connection */
+	if (fsstate == NULL) return;
+	async_conn = fsstate->conn;
+	if (!async_conn ||
+		PFCgetNscans(async_conn) == 1 ||
+		!PFCisAsyncRunning(async_conn))
+		return;
+
+	fetch_more_data(PFCgetAsyncScan(async_conn), FORCE_SYNC);
+
+	Assert(!PFCisAsyncRunning(async_conn));
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2132,7 +2270,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2143,7 +2281,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2165,6 +2303,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 			 GetPrepStmtNumber(fmstate->conn));
 	p_name = pstrdup(prep_name);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * We intentionally do not specify parameter types here, but leave the
 	 * remote server to derive them by default.  This avoids possible problems
@@ -2175,11 +2316,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
+	res = PFCprepare(fmstate->conn,
+					 p_name,
+					 fmstate->query,
+					 0,
+					 NULL);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2297,7 +2438,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2329,7 +2470,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2379,7 +2520,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2423,7 +2564,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2453,7 +2594,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PFCexec(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2723,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2615,7 +2756,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PFCserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2628,7 +2769,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2723,7 +2864,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..b117a88 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -18,19 +18,22 @@
 #include "nodes/relation.h"
 #include "utils/relcache.h"
 
-#include "libpq-fe.h"
+#include "PgFdwConn.h"
+
+struct PgFdwScanState;
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void finish_async_query(PgFdwConn *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
2.1.0.GIT

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to