Hello. This is the new version of FDW async exection feature.

The status of this feature is as follows, as of the last commitfest.

- Async execution is valuable to have.
- But do the first kick in ExecInit phase is wrong.

So the design outline of this version is as following,

- The patch set consists of three parts. The fist is the
  infrastracture in core-side, second is the code to enable
  asynchronous execution of Postgres-FDW. The third part is the
  alternative set of three methods to adapt fetch size, which
  makes asynchronous execution more effective.

- It was a problem when to give the first kick for async exec. It
  is not in ExecInit phase, and ExecProc phase does not fit,
  too. An extra phase ExecPreProc or something is too
  invasive. So I tried "pre-exec callback".

  Any init-node can register callbacks on their turn, then the
  registerd callbacks are called just before ExecProc phase in
  executor. The first patch adds functions and structs to enable
  this.

- The second part is not changed from the previous version. Add
  PgFdwConn as a extended PgConn which have some members to
  support asynchronous execution.

  The asynchronous execution is kicked only for the first
  ForeignScan node on the same foreign server. And the state
  lasts until the next scan comes. This behavior is mainly
  controlled in fetch_more_data(). The behavior limits the number
  of simultaneous exection for one foreign server to 1. This
  behavior is decided from the reason that no reasonable method
  to limit multiplicity of execution on *single peer* was found
  so far.

- The third part is three kind of trials of adaptive fetch size
  feature.

   The first one is duration-based adaptation. The patch
   increases the fetch size by every FETCH execution but try to
   keep the duration of every FETCH below 500 ms. But it is not
   promising because it looks very unstable, or the behavior is
   nearly unforeseeable..

   The second one is based on byte-based FETCH feature. This
   patch adds to FETCH command an argument to limit the number of
   bytes (octets) to send. But this might be a over-exposure of
   the internals. The size is counted based on internal
   representation of a tuple and the client is needed to send the
   overhead of its internal tuple representation in bytes. This
   is effective but quite ugly..

   The third is the most simple and straight-forward way, that
   is, adds a foreign table option to specify the fetch_size. The
   effect of this is also in doubt since the size of tuples for
   one foreign table would vary according to the return-columns
   list. But it is foreseeable for users and is a necessary knob
   for those who want to tune it. Foreign server also could have
   the same option as the default for that for foreign tables but
   this patch have not added it.


The attached patches are the following,

- 0001-Add-infrastructure-of-pre-execution-callbacks.patch
  Infrastructure of pre-execution callback

- 0002-Allow-asynchronous-remote-query-of-postgres_fdw.patch
  FDW asynchronous execution feature

- 0003a-Add-experimental-POC-adaptive-fetch-size-feature.patch
  Adaptive fetch size alternative 1: duration based control

- 0003b-POC-Experimental-fetch_by_size-feature.patch
  Adaptive fetch size alternative 2: FETCH by size

- 0003c-Add-foreign-table-option-to-set-fetch-size.patch
  Adaptive fetch size alternative 3: Foreign table option.

regards,
  
>From eb621897d1410079c6458bc4d1914d1345eb77bc Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 26 Jun 2015 15:12:16 +0900
Subject: [PATCH 1/3] Add infrastructure of pre-execution callbacks.

Some exec nodes have some work before plan tree execution.
This infrastructure provides such functionality
---
 src/backend/executor/execMain.c  | 32 ++++++++++++++++++++++++++++++++
 src/backend/executor/execUtils.c |  2 ++
 src/include/nodes/execnodes.h    | 22 ++++++++++++++++++++++
 3 files changed, 56 insertions(+)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index a1561ce..51a86b2 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -764,6 +764,35 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 		PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt));
 }
 
+/*
+ * Register callbacks to be called just before plan execution.
+ */
+void
+RegisterPreExecCallback(PreExecCallback callback, EState *es, Node *nd,
+						void *arg)
+{
+	PreExecCallbackItem *item;
+
+	item = (PreExecCallbackItem*)
+		MemoryContextAlloc(es->es_query_cxt, sizeof(PreExecCallbackItem));
+	item->callback = callback;
+	item->node = nd;
+	item->arg = arg;
+
+	/* add the new node at the end of the chain */
+	item->next = es->es_preExecCallbacks;
+	es->es_preExecCallbacks = item;	
+}
+
+/* Execute registered pre-exec callbacks */
+void
+RunPreExecCallbacks(EState *es)
+{
+	PreExecCallbackItem *item;
+
+	for (item = es->es_preExecCallbacks ; item ; item = item->next)
+		item->callback(es, item->node);
+}
 
 /* ----------------------------------------------------------------
  *		InitPlan
@@ -956,6 +985,9 @@ InitPlan(QueryDesc *queryDesc, int eflags)
 	 */
 	planstate = ExecInitNode(plan, estate, eflags);
 
+	/* Execute pre-execution callbacks registered during ExecInitNode */
+	RunPreExecCallbacks(estate);
+
 	/*
 	 * Get the tuple descriptor describing the type of tuples to return.
 	 */
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 3c611b9..e80bc22 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -123,6 +123,8 @@ CreateExecutorState(void)
 
 	estate->es_rowMarks = NIL;
 
+	estate->es_preExecCallbacks = NULL;
+
 	estate->es_processed = 0;
 	estate->es_lastoid = InvalidOid;
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 541ee18..cb8d854 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,26 @@ typedef struct ResultRelInfo
 	List	   *ri_onConflictSetWhere;
 } ResultRelInfo;
 
+struct EState;
+
+/* ----------------
+ *	  Pre-execute callbacks
+ * ----------------
+ */
+typedef void (*PreExecCallback) (struct EState *estate, Node *node);
+typedef struct PreExecCallbackItem
+{
+	struct PreExecCallbackItem *next;
+	PreExecCallback callback;		/* function to call just before execution
+									 * starts */
+	Node *node;						/* node to process  */
+	void *arg;						/* any extra arguments */
+} PreExecCallbackItem;
+
+void RegisterPreExecCallback(PreExecCallback callback, struct EState *es,
+							 Node *nd, void *arg);
+void RunPreExecCallbacks(struct EState *es);
+
 /* ----------------
  *	  EState information
  *
@@ -387,6 +407,8 @@ typedef struct EState
 
 	List	   *es_rowMarks;	/* List of ExecRowMarks */
 
+	PreExecCallbackItem	 *es_preExecCallbacks; /* pre-exec callbacks */
+
 	uint32		es_processed;	/* # of tuples processed */
 	Oid			es_lastoid;		/* last oid processed (by INSERT) */
 
-- 
1.8.3.1

>From 8db5a4992cf0509b6c9f93d659a3ba3644f30fa9 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 26 Jun 2015 16:54:39 +0900
Subject: [PATCH 2/3] Allow asynchronous remote query of postgres_fdw.

The new type PgFdwConn makes connection.c to be aware of running of
asynchronous query.

The first node on one connection is invoked prior to ExecProcNode. In
addition to that, fetch_more_data() tries to keep asynchronous
fetching as long as no other query starts to run on the same
connection using the async-aware function of PgFdwConn.
---
 contrib/postgres_fdw/Makefile       |   2 +-
 contrib/postgres_fdw/PgFdwConn.c    | 200 ++++++++++++++++++++++++
 contrib/postgres_fdw/PgFdwConn.h    |  61 ++++++++
 contrib/postgres_fdw/connection.c   |  81 +++++-----
 contrib/postgres_fdw/postgres_fdw.c | 294 +++++++++++++++++++++++++++---------
 contrib/postgres_fdw/postgres_fdw.h |  15 +-
 6 files changed, 538 insertions(+), 115 deletions(-)
 create mode 100644 contrib/postgres_fdw/PgFdwConn.c
 create mode 100644 contrib/postgres_fdw/PgFdwConn.h

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index d2b98e1..d0913e2 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -1,7 +1,7 @@
 # contrib/postgres_fdw/Makefile
 
 MODULE_big = postgres_fdw
-OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES)
+OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES)
 PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL"
 
 PG_CPPFLAGS = -I$(libpq_srcdir)
diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c
new file mode 100644
index 0000000..b13b597
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.c
@@ -0,0 +1,200 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.c
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "PgFdwConn.h"
+
+#define PFC_ALLOCATE()	((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c)		free(c)
+
+struct pgfdw_conn
+{
+	PGconn *pgconn;				/* libpq connection for this connection */
+	int		nscans;				/* number of scans using this connection */
+	struct PgFdwScanState *async_scan; /* the connection currently running
+										* async query on this connection  */
+};
+
+void
+PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan)
+{
+	conn->async_scan = scan;
+}
+
+struct PgFdwScanState *
+PFCgetAsyncScan(PgFdwConn *conn)
+{
+	return conn->async_scan;
+}
+
+int
+PFCisAsyncRunning(PgFdwConn *conn)
+{
+	return conn->async_scan != NULL;
+}
+
+PGconn *
+PFCgetPGconn(PgFdwConn *conn)
+{
+	return conn->pgconn;
+}
+
+int
+PFCgetNscans(PgFdwConn *conn)
+{
+	return conn->nscans;
+}
+
+int
+PFCincrementNscans(PgFdwConn *conn)
+{
+	return ++conn->nscans;
+}
+
+int
+PFCdecrementNscans(PgFdwConn *conn)
+{
+	Assert(conn->nscans > 0);
+	return --conn->nscans;
+}
+
+void
+PFCcancelAsync(PgFdwConn *conn)
+{
+	if (PFCisAsyncRunning(conn))
+		PFCconsumeInput(conn);
+}
+
+void
+PFCinit(PgFdwConn *conn)
+{
+	conn->async_scan = NULL;
+	conn->nscans = 0;
+}
+
+int
+PFCsendQuery(PgFdwConn *conn, const char *query)
+{
+	return PQsendQuery(conn->pgconn, query);
+}
+
+PGresult *
+PFCexec(PgFdwConn *conn, const char *query)
+{
+	return PQexec(conn->pgconn, query);
+}
+
+PGresult *
+PFCexecParams(PgFdwConn *conn,
+			  const char *command,
+			  int nParams,
+			  const Oid *paramTypes,
+			  const char *const * paramValues,
+			  const int *paramLengths,
+			  const int *paramFormats,
+			  int resultFormat)
+{
+	return PQexecParams(conn->pgconn,
+						command, nParams, paramTypes, paramValues,
+						paramLengths, paramFormats, resultFormat);
+}
+
+PGresult *
+PFCprepare(PgFdwConn *conn,
+		   const char *stmtName, const char *query,
+		   int nParams, const Oid *paramTypes)
+{
+	return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes);
+}
+
+PGresult *
+PFCexecPrepared(PgFdwConn *conn,
+				const char *stmtName,
+				int nParams,
+				const char *const * paramValues,
+				const int *paramLengths,
+				const int *paramFormats,
+				int resultFormat)
+{
+	return PQexecPrepared(conn->pgconn, 
+						  stmtName, nParams, paramValues, paramLengths,
+						  paramFormats, resultFormat);
+}
+
+PGresult *
+PFCgetResult(PgFdwConn *conn)
+{
+	return PQgetResult(conn->pgconn);
+}
+
+int
+PFCconsumeInput(PgFdwConn *conn)
+{
+	return PQconsumeInput(conn->pgconn);
+}
+
+int
+PFCisBusy(PgFdwConn *conn)
+{
+	return PQisBusy(conn->pgconn);
+}
+
+ConnStatusType
+PFCstatus(const PgFdwConn *conn)
+{
+	return PQstatus(conn->pgconn);
+}
+
+PGTransactionStatusType
+PFCtransactionStatus(const PgFdwConn *conn)
+{
+	return PQtransactionStatus(conn->pgconn);
+}
+
+int
+PFCserverVersion(const PgFdwConn *conn)
+{
+	return PQserverVersion(conn->pgconn);
+}
+
+char *
+PFCerrorMessage(const PgFdwConn *conn)
+{
+	return PQerrorMessage(conn->pgconn);
+}
+
+int
+PFCconnectionUsedPassword(const PgFdwConn *conn)
+{
+	return PQconnectionUsedPassword(conn->pgconn);
+}
+
+void
+PFCfinish(PgFdwConn *conn)
+{
+	return PQfinish(conn->pgconn);
+	PFC_FREE(conn);
+}
+
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+				   const char *const * values, int expand_dbname)
+{
+	PgFdwConn *ret = PFC_ALLOCATE();
+
+	PFCinit(ret);
+	ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname);
+
+	return ret;
+}
diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h
new file mode 100644
index 0000000..f695f5a
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.h
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.h
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGFDWCONN_H
+#define PGFDWCONN_H
+
+#include "libpq-fe.h"
+
+typedef struct pgfdw_conn PgFdwConn;
+struct PgFdwScanState;
+
+extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan);
+extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn);
+extern int PFCisAsyncRunning(PgFdwConn *conn);
+extern PGconn *PFCgetPGconn(PgFdwConn *conn);
+extern int PFCgetNscans(PgFdwConn *conn);
+extern int PFCincrementNscans(PgFdwConn *conn);
+extern int PFCdecrementNscans(PgFdwConn *conn);
+extern void PFCcancelAsync(PgFdwConn *conn);
+extern void PFCinit(PgFdwConn *conn);
+extern int PFCsendQuery(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexec(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexecParams(PgFdwConn *conn,
+								const char *command,
+								int nParams,
+								const Oid *paramTypes,
+								const char *const * paramValues,
+								const int *paramLengths,
+								const int *paramFormats,
+								int resultFormat);
+extern PGresult *PFCprepare(PgFdwConn *conn,
+							const char *stmtName, const char *query,
+							int nParams, const Oid *paramTypes);
+extern PGresult *PFCexecPrepared(PgFdwConn *conn,
+								 const char *stmtName,
+								 int nParams,
+								 const char *const * paramValues,
+								 const int *paramLengths,
+								 const int *paramFormats,
+								 int resultFormat);
+extern PGresult *PFCgetResult(PgFdwConn *conn);
+extern int PFCconsumeInput(PgFdwConn *conn);
+extern int PFCisBusy(PgFdwConn *conn);
+extern ConnStatusType PFCstatus(const PgFdwConn *conn);
+extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn);
+extern int PFCserverVersion(const PgFdwConn *conn);
+extern char *PFCerrorMessage(const PgFdwConn *conn);
+extern int PFCconnectionUsedPassword(const PgFdwConn *conn);
+extern void PFCfinish(PgFdwConn *conn);
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+			 const char *const * values, int expand_dbname);
+#endif   /* PGFDWCONN_H */
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 1a1e5b5..790b675 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -161,9 +161,11 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 PFCgetPGconn(entry->conn), server->servername);
 	}
 
+	PFCincrementNscans(entry->conn);
+
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
@@ -178,10 +180,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
-static PGconn *
+static PgFdwConn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PgFdwConn	   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +225,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify connection parameters and make connection */
 		check_conn_params(keywords, values);
 
-		conn = PQconnectdbParams(keywords, values, false);
-		if (!conn || PQstatus(conn) != CONNECTION_OK)
+		conn = PFCconnectdbParams(keywords, values, false);
+		if (!conn || PFCstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
-			connmessage = pstrdup(PQerrorMessage(conn));
+			connmessage = pstrdup(PFCerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
@@ -246,7 +248,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
-		if (!superuser() && !PQconnectionUsedPassword(conn))
+		if (!superuser() && !PFCconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 				   errmsg("password is required"),
@@ -263,7 +265,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	{
 		/* Release PGconn data structure if we managed to create one */
 		if (conn)
-			PQfinish(conn);
+			PFCfinish(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -312,9 +314,9 @@ check_conn_params(const char **keywords, const char **values)
  * there are any number of ways to break things.
  */
 static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
 {
-	int			remoteversion = PQserverVersion(conn);
+	int			remoteversion = PFCserverVersion(conn);
 
 	/* Force the search path to contain only pg_catalog (see deparse.c) */
 	do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +350,11 @@ configure_remote_session(PGconn *conn)
  * Convenience subroutine to issue a non-data-returning SQL command to remote
  */
 static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -379,7 +381,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 PFCgetPGconn(entry->conn));
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +410,11 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* ongoing async query should be canceled if no scans left */
+	if (PFCdecrementNscans(conn) == 0)
+		finish_async_query(conn);
 }
 
 /*
@@ -429,7 +429,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -443,7 +443,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -462,7 +462,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +490,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PFCerrorMessage(conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -542,7 +542,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 PFCgetPGconn(entry->conn));
 
 			switch (event)
 			{
@@ -568,7 +568,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PFCexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -600,7 +600,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PFCexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -611,7 +611,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PFCexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -623,17 +623,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
+		PFCcancelAsync(entry->conn);
+		PFCinit(entry->conn);
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PFCstatus(entry->conn) != CONNECTION_OK ||
+			PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p",
+				 PFCgetPGconn(entry->conn));
+			PFCfinish(entry->conn);
 		}
 	}
 
@@ -679,6 +681,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		PFCcancelAsync(entry->conn);
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -704,7 +709,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PFCexec(entry->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 6da01e1..40cac3b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -22,6 +22,7 @@
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "nodes/execnodes.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/cost.h"
@@ -124,6 +125,13 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateRetrievedAttrs
 };
 
+typedef enum fetch_mode {
+	START_ONLY,
+	FORCE_SYNC,
+	ALLOW_ASYNC,
+	EXIT_ASYNC
+} fetch_mode;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
@@ -137,7 +145,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -157,6 +165,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -168,7 +177,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -299,7 +308,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -307,9 +316,9 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *fsstate);
+static void fetch_more_data(PgFdwScanState *node, fetch_mode cmd);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -877,6 +886,21 @@ postgresGetForeignPlan(PlannerInfo *root,
 							NIL /* no custom tlist */ );
 }
 
+/* call back function to kick the query to start on remote */
+static void
+postgresPreExecCallback(EState *estate, Node *node)
+{
+	PgFdwScanState *fsstate =
+		(PgFdwScanState *)((ForeignScanState *)node)->fdw_state;
+
+	create_cursor(fsstate);
+	/*
+	 * Start async scan if this is the first scan. See fetch_more_data() for
+	 * details
+	 */
+	fetch_more_data(fsstate, START_ONLY);
+}
+
 /*
  * postgresBeginForeignScan
  *		Initiate an executor scan of a foreign PostgreSQL table.
@@ -988,6 +1012,16 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	/*
+	 * Register this node to be asynchronously executed if this is the first
+	 * scan on this connection
+	 */
+	if (PFCgetNscans(fsstate->conn) == 1)
+		RegisterPreExecCallback(postgresPreExecCallback, estate,
+								(Node*)node, NULL);
 }
 
 /*
@@ -1006,7 +1040,10 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+	{
+		finish_async_query(fsstate->conn);
+		create_cursor(fsstate);
+	}
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1015,7 +1052,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate, ALLOW_ASYNC);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1075,7 +1112,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PFCexec(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1411,19 +1448,22 @@ postgresExecForeignInsert(EState *estate,
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						  fmstate->p_name,
+						  fmstate->p_nums,
+						  p_values,
+						  NULL,
+						  NULL,
+						  0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1481,19 +1521,22 @@ postgresExecForeignUpdate(EState *estate,
 										(ItemPointer) DatumGetPointer(datum),
 										slot);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						  fmstate->p_name,
+						  fmstate->p_nums,
+						  p_values,
+						  NULL,
+						  NULL,
+						  0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1551,19 +1594,22 @@ postgresExecForeignDelete(EState *estate,
 										(ItemPointer) DatumGetPointer(datum),
 										NULL);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						  fmstate->p_name,
+						  fmstate->p_nums,
+						  p_values,
+						  NULL,
+						  NULL,
+						  0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1613,7 +1659,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PFCexec(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1745,7 +1791,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1855,7 +1901,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1871,7 +1917,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PFCexec(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1936,13 +1982,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn   *conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -2004,8 +2049,8 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+						NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2026,54 +2071,121 @@ create_cursor(ForeignScanState *node)
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch for the case other than exiting from async mode.
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (cmd != EXIT_ASYNC)
+	{
+		fsstate->tuples = NULL;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
-		int			numrows;
+		int			numrows, addrows, restrows;
+		HeapTuple  *tmptuples;
 		int			i;
+		int			fetch_buf_size;
 
 		/* The fetch size is arbitrary, but shouldn't be enormous. */
 		fetch_size = 100;
 
+		/* Make the query to fetch tuples */
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		if (PFCisAsyncRunning(conn))
+		{
+			Assert (cmd != START_ONLY);
+
+			/*
+			 * If the target fsstate is different from the scan state that the
+			 * current async fetch running for, the result should be stored
+			 * into it, then synchronously fetch data for the target fsstate.
+			 */
+			if (fsstate != PFCgetAsyncScan(conn))
+			{
+				fetch_more_data(PFCgetAsyncScan(conn), EXIT_ASYNC);
+				res = PFCexec(conn, sql);
+			}
+			else
+			{
+				/* Get result of running async fetch */
+				res = PFCgetResult(conn);
+				if (PQntuples(res) == fetch_size)
+				{
+					/*
+					 * Connection state doesn't go to IDLE even if all data
+					 * has been sent to client for asynchronous query. One
+					 * more PQgetResult() is needed to reset the state to
+					 * IDLE.  See PQexecFinish() for details.
+					 */
+					if (PFCgetResult(conn) != NULL)
+						elog(ERROR, "Connection status error.");
+				}
+			}
+			PFCsetAsyncScan(conn, NULL);
+		}
+		else
+		{
+			if (cmd == START_ONLY)
+			{
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+
+				PFCsetAsyncScan(conn, fsstate);
+				goto end_of_fetch;
+			}
+
+			/* Elsewise do synchronous query execution */
+			PFCsetAsyncScan(conn, NULL);
+			res = PFCexec(conn, sql);
+		}
+
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+		/* allocate tuple storage */
+		tmptuples = fsstate->tuples;
+		addrows = PQntuples(res);
+		restrows = fsstate->num_tuples - fsstate->next_tuple;
+		numrows = restrows + addrows;
+		fetch_buf_size = numrows * sizeof(HeapTuple);
+		fsstate->tuples = (HeapTuple *) palloc0(fetch_buf_size);
+
+		Assert(restrows == 0 || tmptuples);
+
+		/* copy unread tuples if any */
+		for (i = 0 ; i < restrows ; i++)
+			fsstate->tuples[i] = tmptuples[fsstate->next_tuple + i];
+
 		fsstate->num_tuples = numrows;
 		fsstate->next_tuple = 0;
 
-		for (i = 0; i < numrows; i++)
+		/* Convert the data into HeapTuples */
+		for (i = 0 ; i < addrows; i++)
 		{
-			fsstate->tuples[i] =
+			HeapTuple tup =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
 										   fsstate->retrieved_attrs,
 										   fsstate->temp_cxt);
+			fsstate->tuples[restrows + i] = tup;
+			fetch_buf_size += (HEAPTUPLESIZE + tup->t_len);
 		}
 
 		/* Update fetch_ct_2 */
@@ -2085,6 +2197,23 @@ fetch_more_data(ForeignScanState *node)
 
 		PQclear(res);
 		res = NULL;
+
+		if (cmd == ALLOW_ASYNC)
+		{
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+				PFCsetAsyncScan(conn, fsstate);
+			}
+		}
+
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2098,6 +2227,28 @@ fetch_more_data(ForeignScanState *node)
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_query(PgFdwConn *conn)
+{
+	PgFdwScanState *fsstate = PFCgetAsyncScan(conn);
+	PgFdwConn *async_conn;
+
+	/* Nothing to do if no async connection */
+	if (fsstate == NULL) return;
+	async_conn = fsstate->conn;
+	if (!async_conn ||
+		PFCgetNscans(async_conn) == 1 ||
+		!PFCisAsyncRunning(async_conn))
+		return;
+
+	fetch_more_data(PFCgetAsyncScan(async_conn), EXIT_ASYNC);
+
+	Assert(!PFCisAsyncRunning(async_conn));
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2151,7 +2302,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2162,7 +2313,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2184,6 +2335,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 			 GetPrepStmtNumber(fmstate->conn));
 	p_name = pstrdup(prep_name);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * We intentionally do not specify parameter types here, but leave the
 	 * remote server to derive them by default.  This avoids possible problems
@@ -2194,11 +2348,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
+	res = PFCprepare(fmstate->conn,
+					 p_name,
+					 fmstate->query,
+					 0,
+					 NULL);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2316,7 +2470,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2348,7 +2502,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2398,7 +2552,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2442,7 +2596,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2472,7 +2626,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PFCexec(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2600,7 +2754,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn  *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2633,7 +2787,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PFCserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2646,7 +2800,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2741,7 +2895,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 3835ddb..c87e5cf 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -18,19 +18,22 @@
 #include "nodes/relation.h"
 #include "utils/relcache.h"
 
-#include "libpq-fe.h"
+#include "PgFdwConn.h"
+
+struct PgFdwScanState;
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void finish_async_query(PgFdwConn *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
1.8.3.1

>From 307209588737de34573d39d2b2376ce6f689a0f6 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 26 Jun 2015 17:31:26 +0900
Subject: [PATCH 3/3] Add experimental (POC) adaptive fetch size feature.

---
 contrib/postgres_fdw/postgres_fdw.c | 114 ++++++++++++++++++++++++++++++++++--
 1 file changed, 108 insertions(+), 6 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 40cac3b..108b4ba 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -48,6 +48,27 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
+/* Fetch size at startup. This might be better be a GUC parameter */
+#define MIN_FETCH_SIZE 100
+
+/* Maximum fetch size. This might be better be a GUC parameter */
+#define MAX_FETCH_SIZE 1000
+
+/*
+ * Maximum size for fetch buffer in kilobytes. Ditto.
+ *
+ * This should be far larger than sizeof(HeapTuple) * FETCH_SIZE_MAX. This is
+ * not a hard limit because we cannot know in advance the average row length
+ * returned.
+ */
+#define MAX_FETCH_BUFFER_SIZE 10000	/* 10MB */
+
+/* Maximum duration allowed for a single fetch, in milliseconds */
+#define MAX_FETCH_DURATION 500
+
+/* Number of successive async fetches to enlarge fetch_size */
+#define INCREASE_FETCH_SIZE_THRESHOLD 8
+
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -157,6 +178,12 @@ typedef struct PgFdwScanState
 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
 	int			num_tuples;		/* # of tuples in array */
 	int			next_tuple;		/* index of next one to return */
+	int			fetch_size;		/* rows to be fetched at once */
+	int			successive_async; /* # of successive fetches at this
+                                    fetch_size */
+	long		last_fetch_req_at;  /* The time of the last fetch request, in
+									 * milliseconds*/
+	int			last_buf_size;	/* Buffer size required for the last fetch */
 
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
@@ -886,6 +913,7 @@ postgresGetForeignPlan(PlannerInfo *root,
 							NIL /* no custom tlist */ );
 }
 
+
 /* call back function to kick the query to start on remote */
 static void
 postgresPreExecCallback(EState *estate, Node *node)
@@ -1015,6 +1043,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 
 	fsstate->econtext = node->ss.ps.ps_ExprContext;
 
+	fsstate->fetch_size = MIN_FETCH_SIZE;
+	fsstate->successive_async = 0;
+	fsstate->last_buf_size = 0;
+
 	/*
 	 * Register this node to be asynchronously executed if this is the first
 	 * scan on this connection
@@ -2092,18 +2124,72 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 	{
 		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
-		int			fetch_size;
 		int			numrows, addrows, restrows;
 		HeapTuple  *tmptuples;
+		int			prev_fetch_size = fsstate->fetch_size;
+		int 		new_fetch_size = fsstate->fetch_size;
 		int			i;
+		struct timeval tv = {0, 0};
+		long		current_time;
 		int			fetch_buf_size;
 
-		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
+		gettimeofday(&tv, NULL);
+		current_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;
+
+		/*
+		 * Calculate adaptive fetch size
+		 *
+		 * Calculate fetch_size based on maximal allowed duration and buffer
+		 * space. The fetch buffer size shouldn't be enormous so we try to
+		 * keep it under MAX_FETCH_BUFFER_SIZE.
+		 */
+
+		/* Decrease fetch_size if the previous required buffer size exceeded
+		 * MAX_FETCH_BUFFER_SIZE.*/
+		if (fsstate->last_buf_size > MAX_FETCH_BUFFER_SIZE)
+		{
+			new_fetch_size =
+				(int)((double)fsstate->fetch_size * MAX_FETCH_BUFFER_SIZE /
+					  fsstate->last_buf_size);
+		}
+		/*
+		 * Decrease fetch_size to twice if the last duration to fetch was too
+		 * long.
+		 */
+		if (PFCisBusy(conn) &&
+			fsstate->fetch_size > MIN_FETCH_SIZE &&
+			fsstate->last_fetch_req_at + MAX_FETCH_DURATION <
+			current_time)
+		{
+			int tmp_fetch_size = fsstate->fetch_size / 2;
+			if (tmp_fetch_size < new_fetch_size)
+				new_fetch_size = tmp_fetch_size;
+		}
+
+		/*
+		 * Increase fetch_size to twice if not decreased so far and other
+		 * conditions match.
+		 */
+		if (new_fetch_size == fsstate->fetch_size &&
+			fsstate->successive_async >= INCREASE_FETCH_SIZE_THRESHOLD &&
+			fsstate->fetch_size < MAX_FETCH_SIZE)
+			new_fetch_size *= 2;
+
+		/* Change fetch_size as calculated above */
+		if (new_fetch_size != fsstate->fetch_size)
+		{
+			if (new_fetch_size > MAX_FETCH_SIZE)
+				fsstate->fetch_size = MAX_FETCH_SIZE;
+			else if (new_fetch_size < MIN_FETCH_SIZE)
+				fsstate->fetch_size = MIN_FETCH_SIZE;
+			else
+				fsstate->fetch_size = new_fetch_size;
+			fsstate->successive_async = 0;
+		}
 
 		/* Make the query to fetch tuples */
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+				 fsstate->fetch_size, fsstate->cursor_number);
 
 		if (PFCisAsyncRunning(conn))
 		{
@@ -2123,7 +2209,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 			{
 				/* Get result of running async fetch */
 				res = PFCgetResult(conn);
-				if (PQntuples(res) == fetch_size)
+				if (PQntuples(res) == prev_fetch_size)
 				{
 					/*
 					 * Connection state doesn't go to IDLE even if all data
@@ -2144,6 +2230,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 				if (!PFCsendQuery(conn, sql))
 					pgfdw_report_error(ERROR, res, conn, false,
 									   fsstate->query);
+				fsstate->last_fetch_req_at = current_time;
 
 				PFCsetAsyncScan(conn, fsstate);
 				goto end_of_fetch;
@@ -2188,12 +2275,14 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 			fetch_buf_size += (HEAPTUPLESIZE + tup->t_len);
 		}
 
+		fsstate->last_buf_size = fetch_buf_size / 1024; /* in kilobytes */
+
 		/* Update fetch_ct_2 */
 		if (fsstate->fetch_ct_2 < 2)
 			fsstate->fetch_ct_2++;
 
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+		fsstate->eof_reached = (numrows < prev_fetch_size);
 
 		PQclear(res);
 		res = NULL;
@@ -2208,6 +2297,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 				 */
 				if (!PFCsendQuery(conn, sql))
 					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+				fsstate->last_fetch_req_at = current_time;
 				PFCsetAsyncScan(conn, fsstate);
 			}
 		}
@@ -2223,6 +2313,18 @@ end_of_fetch:
 	}
 	PG_END_TRY();
 
+	if (PFCisAsyncRunning(fsstate->conn))
+	{
+		if (fsstate->successive_async < INCREASE_FETCH_SIZE_THRESHOLD)
+			fsstate->successive_async++;
+	}
+	else
+	{
+		/* Reset fetch_size if the async_fetch stopped */
+		fsstate->successive_async = 0;
+		fsstate->fetch_size = MIN_FETCH_SIZE;
+	}
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
-- 
1.8.3.1

>From e2cc7054bbab06c631d7c78491cb52143a4e47f9 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 29 Jun 2015 16:51:12 +0900
Subject: [PATCH] POC: Experimental fetch_by_size feature

---
 contrib/auto_explain/auto_explain.c             |   8 +-
 contrib/pg_stat_statements/pg_stat_statements.c |   8 +-
 contrib/postgres_fdw/postgres_fdw.c             |  92 +++++++++++++++-----
 src/backend/access/common/heaptuple.c           |  42 +++++++++
 src/backend/commands/copy.c                     |   2 +-
 src/backend/commands/createas.c                 |   2 +-
 src/backend/commands/explain.c                  |   2 +-
 src/backend/commands/extension.c                |   2 +-
 src/backend/commands/matview.c                  |   2 +-
 src/backend/commands/portalcmds.c               |   4 +-
 src/backend/commands/prepare.c                  |   2 +-
 src/backend/executor/execMain.c                 |  39 +++++++--
 src/backend/executor/execUtils.c                |   1 +
 src/backend/executor/functions.c                |   2 +-
 src/backend/executor/spi.c                      |   4 +-
 src/backend/parser/gram.y                       |  65 ++++++++++++++
 src/backend/tcop/postgres.c                     |   2 +
 src/backend/tcop/pquery.c                       | 109 +++++++++++++++++-------
 src/include/access/htup_details.h               |   2 +
 src/include/executor/executor.h                 |   8 +-
 src/include/nodes/execnodes.h                   |   1 +
 src/include/nodes/parsenodes.h                  |   2 +
 src/include/tcop/pquery.h                       |   7 +-
 src/interfaces/ecpg/preproc/ecpg.addons         |  83 ++++++++++++++++++
 24 files changed, 409 insertions(+), 82 deletions(-)

diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 2a184ed..f121a33 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -57,7 +57,7 @@ void		_PG_fini(void);
 static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void explain_ExecutorRun(QueryDesc *queryDesc,
 					ScanDirection direction,
-					long count);
+					long count, long size);
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
@@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nesting_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nesting_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 0eb991c..593d406 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -289,7 +289,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
 static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void pgss_ExecutorRun(QueryDesc *queryDesc,
 				 ScanDirection direction,
-				 long count);
+				 long count, long size);
 static void pgss_ExecutorFinish(QueryDesc *queryDesc);
 static void pgss_ExecutorEnd(QueryDesc *queryDesc);
 static void pgss_ProcessUtility(Node *parsetree, const char *queryString,
@@ -870,15 +870,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nested_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nested_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 40cac3b..0419cde 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -48,6 +48,11 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
+/* Maximum tuples per fetch */
+#define MAX_FETCH_SIZE				10000
+
+/* Maximum memory usable for retrieved data  */
+#define MAX_FETCH_MEM				(512 * 1024)
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -166,6 +171,8 @@ typedef struct PgFdwScanState
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
+	long		max_palloced_mem; /* For test, remove me later */
+	int			max_numrows;
 } PgFdwScanState;
 
 /*
@@ -331,6 +338,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 							  double *totaldeadrows);
 static void analyze_row_processor(PGresult *res, int row,
 					  PgFdwAnalyzeState *astate);
+static Size estimate_tuple_overhead(TupleDesc tupDesc,
+									List *retrieved_attrs);
 static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   int row,
 						   Relation rel,
@@ -1138,6 +1147,7 @@ postgresEndForeignScan(ForeignScanState *node)
 	if (fsstate == NULL)
 		return;
 
+	elog(LOG, "Max memory for tuple store = %ld, max numrows = %d", fsstate->max_palloced_mem, fsstate->max_numrows);
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
 		close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2092,18 +2102,20 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 	{
 		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
-		int			fetch_size;
+		int			fetch_mem;
+		int			tuple_overhead;
 		int			numrows, addrows, restrows;
 		HeapTuple  *tmptuples;
 		int			i;
 		int			fetch_buf_size;
 
-		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
-
-		/* Make the query to fetch tuples */
-		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+		tuple_overhead = estimate_tuple_overhead(fsstate->attinmeta->tupdesc,
+												 fsstate->retrieved_attrs);
+		fetch_mem = MAX_FETCH_MEM - MAX_FETCH_SIZE * sizeof(HeapTuple);
+		snprintf(sql, sizeof(sql), "FETCH %d LIMIT %d (%d) FROM c%u",
+				 MAX_FETCH_SIZE,
+				 fetch_mem, tuple_overhead,
+				 fsstate->cursor_number);
 
 		if (PFCisAsyncRunning(conn))
 		{
@@ -2123,17 +2135,15 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 			{
 				/* Get result of running async fetch */
 				res = PFCgetResult(conn);
-				if (PQntuples(res) == fetch_size)
-				{
-					/*
-					 * Connection state doesn't go to IDLE even if all data
-					 * has been sent to client for asynchronous query. One
-					 * more PQgetResult() is needed to reset the state to
-					 * IDLE.  See PQexecFinish() for details.
-					 */
-					if (PFCgetResult(conn) != NULL)
-						elog(ERROR, "Connection status error.");
-				}
+
+				/*
+				 * Connection state doesn't go to IDLE even if all data
+				 * has been sent to client for asynchronous query. One
+				 * more PQgetResult() is needed to reset the state to
+				 * IDLE.  See PQexecFinish() for details.
+				 */
+				if (PFCgetResult(conn) != NULL)
+					elog(ERROR, "Connection status error.");
 			}
 			PFCsetAsyncScan(conn, NULL);
 		}
@@ -2161,6 +2171,8 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 		/* allocate tuple storage */
 		tmptuples = fsstate->tuples;
 		addrows = PQntuples(res);
+		if (fsstate->max_numrows < addrows)
+			fsstate->max_numrows = addrows;
 		restrows = fsstate->num_tuples - fsstate->next_tuple;
 		numrows = restrows + addrows;
 		fetch_buf_size = numrows * sizeof(HeapTuple);
@@ -2188,12 +2200,15 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 			fetch_buf_size += (HEAPTUPLESIZE + tup->t_len);
 		}
 
+		if (fsstate->max_palloced_mem < fetch_buf_size)
+			fsstate->max_palloced_mem = fetch_buf_size;
+
 		/* Update fetch_ct_2 */
 		if (fsstate->fetch_ct_2 < 2)
 			fsstate->fetch_ct_2++;
 
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+		/* Must be EOF if we have no new tuple here. */
+		fsstate->eof_reached = (addrows == 0);
 
 		PQclear(res);
 		res = NULL;
@@ -3007,6 +3022,43 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 }
 
 /*
+ * Compute the estimated overhead of the result tuples
+ * See heap_form_tuple for the details of this calculation.
+ */
+static Size
+estimate_tuple_overhead(TupleDesc tupDesc,
+						List *retrieved_attrs)
+{
+	Size size = 0;
+	int	 ncol = list_length(retrieved_attrs);
+	ListCell	*lc;
+
+	size += offsetof(HeapTupleHeaderData, t_bits);
+	size += BITMAPLEN(ncol);
+
+	if (tupDesc->tdhasoid)
+		size += sizeof(Oid);
+
+	size = MAXALIGN(size);
+
+	size += sizeof(Datum) * ncol;
+	size += sizeof(bool) * ncol;
+
+	foreach (lc, retrieved_attrs)
+	{
+		int i = lfirst_int(lc);
+
+		if (i > 0)
+		{
+			if (tupDesc->attrs[i - 1]->attbyval)
+				size -= (sizeof(Datum) - tupDesc->attrs[i - 1]->attlen);
+		}
+	}
+
+	return size;
+}
+
+/*
  * Create a tuple from the specified row of the PGresult.
  *
  * rel is the local representation of the foreign table, attinmeta is
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index 09aea79..17525b5 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -133,6 +133,48 @@ heap_compute_data_size(TupleDesc tupleDesc,
 	return data_length;
 }
 
+Size
+slot_compute_raw_data_size(TupleTableSlot *slot)
+{
+	TupleDesc tupleDesc = slot->tts_tupleDescriptor;
+	Datum *values = slot->tts_values;
+	bool  *isnull = slot->tts_isnull;
+	Size		data_length = 0;
+	int			i;
+	int			numberOfAttributes = tupleDesc->natts;
+	Form_pg_attribute *att = tupleDesc->attrs;
+
+	if (slot->tts_nvalid < tupleDesc->natts)
+		heap_deform_tuple(slot->tts_tuple, tupleDesc,
+						  slot->tts_values, slot->tts_isnull);
+
+	for (i = 0; i < numberOfAttributes; i++)
+	{
+		Datum		val;
+		Form_pg_attribute atti;
+
+		if (isnull[i])
+			continue;
+
+		val = values[i];
+		atti = att[i];
+
+		if (atti->attlen == -1)
+		{
+			data_length += toast_raw_datum_size(val);
+		}
+		else
+		{
+			data_length = att_align_datum(data_length, atti->attalign,
+										  atti->attlen, val);
+			data_length = att_addlength_datum(data_length, atti->attlen,
+											  val);
+		}
+	}
+
+	return data_length;
+}
+
 /*
  * heap_fill_tuple
  *		Load data portion of a tuple from values/isnull arrays
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8904676..463fc67 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1928,7 +1928,7 @@ CopyTo(CopyState cstate)
 	else
 	{
 		/* run the plan --- the dest receiver will send tuples */
-		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L, 0);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 41183f6..7612391 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		dir = ForwardScanDirection;
 
 	/* run the plan */
-	ExecutorRun(queryDesc, dir, 0L);
+	ExecutorRun(queryDesc, dir, 0L, 0L, 0);
 
 	/* save the rowcount if we're given a completionTag to fill */
 	if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 0d1ecc2..4480343 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -498,7 +498,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			dir = ForwardScanDirection;
 
 		/* run the plan */
-		ExecutorRun(queryDesc, dir, 0L);
+		ExecutorRun(queryDesc, dir, 0L, 0L, 0);
 
 		/* run cleanup too */
 		ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index 2b1dcd0..bc116f9 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -733,7 +733,7 @@ execute_sql_string(const char *sql, const char *filename)
 										dest, NULL, 0);
 
 				ExecutorStart(qdesc, 0);
-				ExecutorRun(qdesc, ForwardScanDirection, 0);
+				ExecutorRun(qdesc, ForwardScanDirection, 0L, 0L, 0);
 				ExecutorFinish(qdesc);
 				ExecutorEnd(qdesc);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 5492e59..39e29ba 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -363,7 +363,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
 	/* run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0);
 
 	/* and clean up */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 2794537..85fffc1 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -177,6 +177,8 @@ PerformPortalFetch(FetchStmt *stmt,
 	nprocessed = PortalRunFetch(portal,
 								stmt->direction,
 								stmt->howMany,
+								stmt->howLarge,
+								stmt->tupoverhead,
 								dest);
 
 	/* Return command status if wanted */
@@ -375,7 +377,7 @@ PersistHoldablePortal(Portal portal)
 										true);
 
 		/* Fetch the result set into the tuplestore */
-		ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0);
 
 		(*queryDesc->dest->rDestroy) (queryDesc->dest);
 		queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index fb33d30..46fe4f8 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
 	 */
 	PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-	(void) PortalRun(portal, count, false, dest, dest, completionTag);
+	(void) PortalRun(portal, count, 0L, 0, false, dest, dest, completionTag);
 
 	PortalDrop(portal, false);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 51a86b2..5f0de97 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -79,6 +79,8 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
+			int  tupleOverhead,
 			ScanDirection direction,
 			DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -277,17 +279,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count)
+			ScanDirection direction, long count, long size, int tupoverhead)
 {
 	if (ExecutorRun_hook)
-		(*ExecutorRun_hook) (queryDesc, direction, count);
+		(*ExecutorRun_hook) (queryDesc, direction,
+							 count, size, tupoverhead);
 	else
-		standard_ExecutorRun(queryDesc, direction, count);
+		standard_ExecutorRun(queryDesc, direction,
+							 count, size, tupoverhead);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count)
+					 ScanDirection direction,
+					 long count, long size, int tupoverhead)
 {
 	EState	   *estate;
 	CmdType		operation;
@@ -339,6 +344,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 					operation,
 					sendTuples,
 					count,
+					size,
+					tupoverhead,
 					direction,
 					dest);
 
@@ -1551,22 +1558,27 @@ ExecutePlan(EState *estate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
+			int  tupleOverhead,
 			ScanDirection direction,
 			DestReceiver *dest)
 {
 	TupleTableSlot *slot;
 	long		current_tuple_count;
+	long		sent_size;
 
 	/*
 	 * initialize local variables
 	 */
 	current_tuple_count = 0;
-
+	sent_size = 0;
 	/*
 	 * Set the direction.
 	 */
 	estate->es_direction = direction;
 
+	estate->es_stoppedbysize = false;
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
@@ -1621,6 +1633,23 @@ ExecutePlan(EState *estate,
 		current_tuple_count++;
 		if (numberTuples && numberTuples == current_tuple_count)
 			break;
+
+		if (sizeTuples > 0)
+		{
+			/*
+			 * Count the size of tuples we've sent
+			 *
+			 * This needs all attributes deformed so a bit slow on some cases.
+			 */
+			sent_size += slot_compute_raw_data_size(slot) + tupleOverhead;
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (sizeTuples < sent_size)
+			{
+				estate->es_stoppedbysize = true;
+				break;
+			}
+		}
 	}
 }
 
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index e80bc22..6b59c05 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -126,6 +126,7 @@ CreateExecutorState(void)
 	estate->es_preExecCallbacks = NULL;
 
 	estate->es_processed = 0;
+	estate->es_stoppedbysize = false;
 	estate->es_lastoid = InvalidOid;
 
 	estate->es_top_eflags = 0;
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index ce49c47..7ab2e67 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -853,7 +853,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
 		/* Run regular commands to completion unless lazyEval */
 		long		count = (es->lazyEval) ? 1L : 0L;
 
-		ExecutorRun(es->qd, ForwardScanDirection, count);
+		ExecutorRun(es->qd, ForwardScanDirection, count, 0L, 0);
 
 		/*
 		 * If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index d544ad9..f29c3a8 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2399,7 +2399,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
 
 	ExecutorStart(queryDesc, eflags);
 
-	ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L, 0);
 
 	_SPI_current->processed = queryDesc->estate->es_processed;
 	_SPI_current->lastoid = queryDesc->estate->es_lastoid;
@@ -2477,7 +2477,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	/* Run the cursor */
 	nfetched = PortalRunFetch(portal,
 							  direction,
-							  count,
+							  count, 0L, 0,
 							  dest);
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e0ff6f1..b7b061c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -538,6 +538,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <str>		opt_existing_window_name
 %type <boolean> opt_if_not_exists
 
+%type <ival>	opt_overhead
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -6066,6 +6068,16 @@ fetch_args:	cursor_name
 					n->howMany = $1;
 					$$ = (Node *)n;
 				}
+			| SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $1;
+					n->howLarge = $3;
+					n->tupoverhead = $4;
+					$$ = (Node *)n;
+				}
 			| ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6074,6 +6086,16 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| ALL LIMIT Iconst opt_overhead opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $3;
+					n->tupoverhead = $4;
+					$$ = (Node *)n;
+				}
 			| FORWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6090,6 +6112,16 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| FORWARD SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $7;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					n->tupoverhead = $5;
+					$$ = (Node *)n;
+				}
 			| FORWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6098,6 +6130,16 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| FORWARD ALL LIMIT Iconst  opt_overhead  opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $7;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					n->tupoverhead = $5;
+					$$ = (Node *)n;
+				}
 			| BACKWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6114,6 +6156,16 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| BACKWARD SignedIconst LIMIT Iconst  opt_overhead opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $7;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					n->tupoverhead = $5;
+					$$ = (Node *)n;
+				}
 			| BACKWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6122,6 +6174,16 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| BACKWARD ALL LIMIT Iconst  opt_overhead opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $7;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					n->tupoverhead = $5;
+					$$ = (Node *)n;
+				}
 		;
 
 from_in:	FROM									{}
@@ -6132,6 +6194,9 @@ opt_from_in:	from_in								{}
 			| /* EMPTY */							{}
 		;
 
+opt_overhead:	'(' Iconst ')'						{ $$ = $2;}
+			| /* EMPTY */							{ $$ = 0; }
+		;
 
 /*****************************************************************************
  *
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ce4bdaf..70641eb 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1103,6 +1103,7 @@ exec_simple_query(const char *query_string)
 		 */
 		(void) PortalRun(portal,
 						 FETCH_ALL,
+						 0L, 0,
 						 isTopLevel,
 						 receiver,
 						 receiver,
@@ -1987,6 +1988,7 @@ exec_execute_message(const char *portal_name, long max_rows)
 
 	completed = PortalRun(portal,
 						  max_rows,
+						  0L, 0,
 						  true, /* always top level */
 						  receiver,
 						  receiver,
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 9c14e8a..ce9541a 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/htup_details.h"
 #include "commands/prepare.h"
 #include "executor/tstoreReceiver.h"
 #include "miscadmin.h"
@@ -39,9 +40,11 @@ static void ProcessQuery(PlannedStmt *plan,
 			 DestReceiver *dest,
 			 char *completionTag);
 static void FillPortalStore(Portal portal, bool isTopLevel);
-static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
+static uint32 RunFromStore(Portal portal, ScanDirection direction,
+		     long count, long size, int tupoverhead, bool *stoppedbysize,
 			 DestReceiver *dest);
-static long PortalRunSelect(Portal portal, bool forward, long count,
+static long PortalRunSelect(Portal portal, bool forward,
+				long count, long size, int tupoverhead,
 				DestReceiver *dest);
 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
 				 DestReceiver *dest, char *completionTag);
@@ -51,6 +54,8 @@ static void PortalRunMulti(Portal portal, bool isTopLevel,
 static long DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
+				 int tupoverehad,
 				 DestReceiver *dest);
 static void DoPortalRewind(Portal portal);
 
@@ -182,7 +187,7 @@ ProcessQuery(PlannedStmt *plan,
 	/*
 	 * Run the plan to completion.
 	 */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0);
 
 	/*
 	 * Build command completion status string, if caller wants one.
@@ -703,8 +708,8 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, bool isTopLevel,
-		  DestReceiver *dest, DestReceiver *altdest,
+PortalRun(Portal portal, long count, long size, int tupoverhead,
+		  bool isTopLevel, DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag)
 {
 	bool		result;
@@ -787,7 +792,8 @@ PortalRun(Portal portal, long count, bool isTopLevel,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				nprocessed = PortalRunSelect(portal, true, count, dest);
+				nprocessed = PortalRunSelect(portal, true,
+											 count, size, tupoverhead, dest);
 
 				/*
 				 * If the portal result contains a command tag and the caller
@@ -892,11 +898,14 @@ static long
 PortalRunSelect(Portal portal,
 				bool forward,
 				long count,
+				long size,
+				int  tupoverhead,
 				DestReceiver *dest)
 {
 	QueryDesc  *queryDesc;
 	ScanDirection direction;
 	uint32		nprocessed;
+	bool		stoppedbysize;
 
 	/*
 	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a
@@ -939,12 +948,15 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction,
+									  count, size, tupoverhead,
+									  &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size, tupoverhead);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -954,8 +966,9 @@ PortalRunSelect(Portal portal,
 
 			if (nprocessed > 0)
 				portal->atStart = false;		/* OK to go backward now */
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 				portal->atEnd = true;	/* we retrieved 'em all */
 			oldPos = portal->portalPos;
 			portal->portalPos += nprocessed;
@@ -982,12 +995,15 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction,
+									  count, size, tupoverhead,
+									  &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size, tupoverhead);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -998,8 +1014,9 @@ PortalRunSelect(Portal portal,
 				portal->atEnd = false;	/* OK to go forward now */
 				portal->portalPos++;	/* adjust for endpoint case */
 			}
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 			{
 				portal->atStart = true; /* we retrieved 'em all */
 				portal->portalPos = 0;
@@ -1088,11 +1105,15 @@ FillPortalStore(Portal portal, bool isTopLevel)
  * out for memory leaks.
  */
 static uint32
-RunFromStore(Portal portal, ScanDirection direction, long count,
-			 DestReceiver *dest)
+RunFromStore(Portal portal, ScanDirection direction,
+			 long count, long size_limit, int tupoverhead,
+			 bool *stoppedbysize, DestReceiver *dest)
 {
 	long		current_tuple_count = 0;
 	TupleTableSlot *slot;
+	long			sent_size = 0;
+
+	*stoppedbysize = false;
 
 	slot = MakeSingleTupleTableSlot(portal->tupDesc);
 
@@ -1122,6 +1143,9 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 				break;
 
 			(*dest->receiveSlot) (slot, dest);
+			/* Count the size of tuples we've sent */
+			sent_size += slot_compute_raw_data_size(slot)
+				+ tupoverhead;
 
 			ExecClearTuple(slot);
 
@@ -1133,10 +1157,19 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			current_tuple_count++;
 			if (count && count == current_tuple_count)
 				break;
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (current_tuple_count > 0 &&
+				size_limit > 0 && size_limit < sent_size)
+			{
+				*stoppedbysize = true;
+				break;
+			}
 		}
 	}
 
 	(*dest->rShutdown) (dest);
+	elog(LOG, "Sent %ld bytes", sent_size);
 
 	ExecDropSingleTupleTableSlot(slot);
 
@@ -1385,6 +1418,8 @@ long
 PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
+			   int  tupoverhead,
 			   DestReceiver *dest)
 {
 	long		result;
@@ -1422,7 +1457,8 @@ PortalRunFetch(Portal portal,
 		switch (portal->strategy)
 		{
 			case PORTAL_ONE_SELECT:
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection,
+										  count, size, tupoverhead, dest);
 				break;
 
 			case PORTAL_ONE_RETURNING:
@@ -1439,7 +1475,8 @@ PortalRunFetch(Portal portal,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection,
+										  count, size, tupoverhead, dest);
 				break;
 
 			default:
@@ -1484,6 +1521,8 @@ static long
 DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
+				 int  tupoverhead,
 				 DestReceiver *dest)
 {
 	bool		forward;
@@ -1526,7 +1565,7 @@ DoPortalRunFetch(Portal portal,
 				{
 					DoPortalRewind(portal);
 					if (count > 1)
-						PortalRunSelect(portal, true, count - 1,
+						PortalRunSelect(portal, true, count - 1, 0L, 0,
 										None_Receiver);
 				}
 				else
@@ -1536,13 +1575,15 @@ DoPortalRunFetch(Portal portal,
 					if (portal->atEnd)
 						pos++;	/* need one extra fetch if off end */
 					if (count <= pos)
-						PortalRunSelect(portal, false, pos - count + 1,
+						PortalRunSelect(portal, false,
+										pos - count + 1, 0L, 0,
 										None_Receiver);
 					else if (count > pos + 1)
-						PortalRunSelect(portal, true, count - pos - 1,
+						PortalRunSelect(portal, true,
+										count - pos - 1, 0L, 0,
 										None_Receiver);
 				}
-				return PortalRunSelect(portal, true, 1L, dest);
+				return PortalRunSelect(portal, true, 1L, 0L, 0, dest);
 			}
 			else if (count < 0)
 			{
@@ -1553,17 +1594,19 @@ DoPortalRunFetch(Portal portal,
 				 * (Is it worth considering case where count > half of size of
 				 * query?  We could rewind once we know the size ...)
 				 */
-				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
+				PortalRunSelect(portal, true,
+								FETCH_ALL, 0L, 0, None_Receiver);
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false,
+									-count - 1, 0, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, 0, dest);
 			}
 			else
 			{
 				/* count == 0 */
 				/* Rewind to start, return zero rows */
 				DoPortalRewind(portal);
-				return PortalRunSelect(portal, true, 0L, dest);
+				return PortalRunSelect(portal, true, 0L, 0L, 0, dest);
 			}
 			break;
 		case FETCH_RELATIVE:
@@ -1573,8 +1616,9 @@ DoPortalRunFetch(Portal portal,
 				 * Definition: advance count-1 rows, return next row (if any).
 				 */
 				if (count > 1)
-					PortalRunSelect(portal, true, count - 1, None_Receiver);
-				return PortalRunSelect(portal, true, 1L, dest);
+					PortalRunSelect(portal, true,
+									count - 1, 0L, 0, None_Receiver);
+				return PortalRunSelect(portal, true, 1L, 0L, 0, dest);
 			}
 			else if (count < 0)
 			{
@@ -1583,8 +1627,9 @@ DoPortalRunFetch(Portal portal,
 				 * any).
 				 */
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false,
+									-count - 1, 0L, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, 0, dest);
 			}
 			else
 			{
@@ -1630,7 +1675,7 @@ DoPortalRunFetch(Portal portal,
 			 */
 			if (on_row)
 			{
-				PortalRunSelect(portal, false, 1L, None_Receiver);
+				PortalRunSelect(portal, false, 1L, 0L, 0, None_Receiver);
 				/* Set up to fetch one row forward */
 				count = 1;
 				forward = true;
@@ -1652,7 +1697,7 @@ DoPortalRunFetch(Portal portal,
 		return result;
 	}
 
-	return PortalRunSelect(portal, forward, count, dest);
+	return PortalRunSelect(portal, forward, count, size, tupoverhead, dest);
 }
 
 /*
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index 55d483d..5f0c8f3 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -20,6 +20,7 @@
 #include "access/transam.h"
 #include "storage/bufpage.h"
 
+#include "executor/tuptable.h"
 /*
  * MaxTupleAttributeNumber limits the number of (user) columns in a tuple.
  * The key limit on this value is that the size of the fixed overhead for
@@ -761,6 +762,7 @@ extern Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
 /* prototypes for functions in common/heaptuple.c */
 extern Size heap_compute_data_size(TupleDesc tupleDesc,
 					   Datum *values, bool *isnull);
+extern Size slot_compute_raw_data_size(TupleTableSlot *slot);
 extern void heap_fill_tuple(TupleDesc tupleDesc,
 				Datum *values, bool *isnull,
 				char *data, Size data_size,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 193a654..e2706a6 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -79,8 +79,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
-												   ScanDirection direction,
-												   long count);
+									   ScanDirection direction,
+									   long count, long size, int tupoverhead);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -175,9 +175,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count);
+		ScanDirection direction, long count, long size, int tupoverhead);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count);
+		 ScanDirection direction, long count, long size, int tupoverhead);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index cb8d854..f8121ec 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -410,6 +410,7 @@ typedef struct EState
 	PreExecCallbackItem	 *es_preExecCallbacks; /* pre-exec callbacks */
 
 	uint32		es_processed;	/* # of tuples processed */
+	bool		es_stoppedbysize; /* true if processing stopped by size */
 	Oid			es_lastoid;		/* last oid processed (by INSERT) */
 
 	int			es_top_eflags;	/* eflags passed to ExecutorStart */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 868905b..094c0ac 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2393,6 +2393,8 @@ typedef struct FetchStmt
 	NodeTag		type;
 	FetchDirection direction;	/* see above */
 	long		howMany;		/* number of rows, or position argument */
+	long		howLarge;		/* total bytes of rows */
+	int			tupoverhead;	/* declared overhead per tuple in client */
 	char	   *portalname;		/* name of portal (cursor) */
 	bool		ismove;			/* TRUE if MOVE */
 } FetchStmt;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 8073a6e..021532c 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -17,7 +17,6 @@
 #include "nodes/parsenodes.h"
 #include "utils/portal.h"
 
-
 extern PGDLLIMPORT Portal ActivePortal;
 
 
@@ -33,13 +32,15 @@ extern void PortalStart(Portal portal, ParamListInfo params,
 extern void PortalSetResultFormat(Portal portal, int nFormats,
 					  int16 *formats);
 
-extern bool PortalRun(Portal portal, long count, bool isTopLevel,
-		  DestReceiver *dest, DestReceiver *altdest,
+extern bool PortalRun(Portal portal, long count, long size, int tupoverhead,
+		  bool isTopLevel, DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag);
 
 extern long PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
+			   int tupoverhead,
 			   DestReceiver *dest);
 
 #endif   /* PQUERY_H */
diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons
index b3b36cf..424f412 100644
--- a/src/interfaces/ecpg/preproc/ecpg.addons
+++ b/src/interfaces/ecpg/preproc/ecpg.addons
@@ -220,13 +220,56 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon
 ECPG: fetch_argsPRIORopt_from_incursor_name addon
 ECPG: fetch_argsFIRST_Popt_from_incursor_name addon
 ECPG: fetch_argsLAST_Popt_from_incursor_name addon
+		add_additional_variables($3, false);
+		if ($3[0] == ':')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
 ECPG: fetch_argsALLopt_from_incursor_name addon
+ECPG: fetch_argsFORWARDopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
 		{
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsALLLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDALLLIMITIconstopt_overheadopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDALLLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($7, false);
+		if ($7[0] == ':')
+		{
+			free($7);
+			$7 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
+		if ($5[0] == '$')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
@@ -234,11 +277,51 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
 		if ($1[0] == '$')
 		{
 			free($1);
 			$1 = mm_strdup("$0");
 		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($7, false);
+		if ($7[0] == ':')
+		{
+			free($7);
+			$7 = mm_strdup("$0");
+		}
+		if ($2[0] == '$')
+		{
+			free($2);
+			$2 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
+		if ($5[0] == '$')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon
 ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon
 		add_additional_variables($4, false);
-- 
1.8.3.1

>From 92596ee8cda043cbc37cd27d4ec668f624a49d6e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 30 Jun 2015 15:29:46 +0900
Subject: [PATCH] Add foreign table option to set fetch size.

---
 contrib/postgres_fdw/option.c       |  2 ++
 contrib/postgres_fdw/postgres_fdw.c | 31 +++++++++++++++++++++++++++----
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 7547ec2..793239d 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -153,6 +153,8 @@ InitPgFdwOptions(void)
 		/* updatable is available on both server and table */
 		{"updatable", ForeignServerRelationId, false},
 		{"updatable", ForeignTableRelationId, false},
+		/* fetch_size is available on table (XXX: also server may have it.) */
+		{"fetch_size", ForeignTableRelationId, false},
 		{NULL, InvalidOid, false}
 	};
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 40cac3b..e01213e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -152,6 +152,7 @@ typedef struct PgFdwScanState
 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
 	List	   *param_exprs;	/* executable expressions for param values */
 	const char **param_values;	/* textual values of query parameters */
+	int			fetch_size;		/* number of tuples to request by one fetch */
 
 	/* for storing result tuples */
 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
@@ -945,6 +946,31 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, server->serverid);
 
+	fsstate->fetch_size = -1;
+	foreach(lc, table->options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		/* Does anyone specify negatives? Who cares? */
+		if (strcmp(def->defname, "fetch_size") == 0)
+		{
+			char *ep = NULL;
+			char *defstr = defGetString(def);
+
+			fsstate->fetch_size = strtol(defstr, &ep, 10);
+			if (*ep || ep == defstr)
+			{
+				elog(WARNING,
+					 "Option \"%s\" must be a positive integer (foreign table \"%s\") : \"%s\"",
+					 def->defname, get_rel_name(table->relid), defstr);
+				fsstate->fetch_size = -1;		/* Use default */
+			}
+			break;
+		}
+	}
+	if (fsstate->fetch_size < 1)
+		fsstate->fetch_size = 100;		/* default size */
+
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
@@ -2092,15 +2118,12 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 	{
 		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
-		int			fetch_size;
+		int			fetch_size = fsstate->fetch_size;
 		int			numrows, addrows, restrows;
 		HeapTuple  *tmptuples;
 		int			i;
 		int			fetch_buf_size;
 
-		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
-
 		/* Make the query to fetch tuples */
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
-- 
1.8.3.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to