Hello, Andrey.

At Wed, 3 Jun 2020 15:00:06 +0500, Andrey Lepikhov <a.lepik...@postgrespro.ru> 
wrote in 
> This patch no longer applies cleanly.
> In addition, code comments contain spelling errors.

Sure. Thaks for noticing of them and sorry for the many typos.
Additional item in WaitEventIPC conflicted with this.


I found the following typos.

connection.c:
  s/Rerturns/Returns/
postgres-fdw.c:
  s/Retrive/Retrieve/
  s/ForeginScanState/ForeignScanState/
  s/manipuration/manipulation/
  s/asyncstate/async state/
  s/alrady/already/

nodeAppend.c:  
  s/Rery/Retry/

createplan.c:
  s/chidlren/children/

resowner.c:
  s/identier/identifier/ X 2

execnodes.h:
  s/sutff/stuff/

plannodes.h:
  s/asyncronous/asynchronous/
  
  
Removed a useless variable PgFdwScanState.result_ready.
Removed duplicate code from remove_async_node() by using move_to_next_waiter().
Done some minor cleanups.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From db231fa99da5954b52e195f6af800c0f9b991ed4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH v3 1/3] Allow wait event set to be registered to resource
 owner

WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
 src/backend/libpq/pqcomm.c                    |  2 +-
 src/backend/storage/ipc/latch.c               | 18 ++++-
 src/backend/storage/lmgr/condition_variable.c |  2 +-
 src/backend/utils/resowner/resowner.c         | 67 +++++++++++++++++++
 src/include/storage/latch.h                   |  4 +-
 src/include/utils/resowner_private.h          |  8 +++
 6 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 7717bb2719..16aefb03ee 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -218,7 +218,7 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
 	AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
 					  NULL, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 05df5017c4..a8b52cd381 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -56,6 +56,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/resowner_private.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -84,6 +85,8 @@ struct WaitEventSet
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
 
+	ResourceOwner	resowner;	/* Resource owner */
+
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
 	 * set is waiting for.
@@ -393,7 +396,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
 	int			ret = 0;
 	int			rc;
 	WaitEvent	event;
-	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
 
 	if (wakeEvents & WL_TIMEOUT)
 		Assert(timeout >= 0);
@@ -560,12 +563,15 @@ ResetLatch(Latch *latch)
  * WaitEventSetWait().
  */
 WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
 {
 	WaitEventSet *set;
 	char	   *data;
 	Size		sz = 0;
 
+	if (res)
+		ResourceOwnerEnlargeWESs(res);
+
 	/*
 	 * Use MAXALIGN size/alignment to guarantee that later uses of memory are
 	 * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -680,6 +686,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
+	/* Register this wait event set if requested */
+	set->resowner = res;
+	if (res)
+		ResourceOwnerRememberWES(set->resowner, set);
+
 	return set;
 }
 
@@ -725,6 +736,9 @@ FreeWaitEventSet(WaitEventSet *set)
 	}
 #endif
 
+	if (set->resowner != NULL)
+		ResourceOwnerForgetWES(set->resowner, set);
+
 	pfree(set);
 }
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 37b6a4eecd..fcc92138fe 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -70,7 +70,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 	{
 		WaitEventSet *new_event_set;
 
-		new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
+		new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2);
 		AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
 						  MyLatch, NULL);
 		AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 8bc2c4e9ea..237ca9fa30 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -128,6 +128,7 @@ typedef struct ResourceOwnerData
 	ResourceArray filearr;		/* open temporary files */
 	ResourceArray dsmarr;		/* dynamic shmem segments */
 	ResourceArray jitarr;		/* JIT contexts */
+	ResourceArray wesarr;		/* wait event sets */
 
 	/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
 	int			nlocks;			/* number of owned locks */
@@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
 static void PrintSnapshotLeakWarning(Snapshot snapshot);
 static void PrintFileLeakWarning(File file);
 static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
 
 
 /*****************************************************************************
@@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 	ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
 	ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
+	ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
 
 	return owner;
 }
@@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
 			jit_release_context(context);
 		}
+
+		/* Ditto for wait event sets */
+		while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+		{
+			WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+			if (isCommit)
+				PrintWESLeakWarning(event);
+			FreeWaitEventSet(event);
+		}
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
@@ -725,6 +738,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	Assert(owner->filearr.nitems == 0);
 	Assert(owner->dsmarr.nitems == 0);
 	Assert(owner->jitarr.nitems == 0);
+	Assert(owner->wesarr.nitems == 0);
 	Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
 
 	/*
@@ -752,6 +766,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	ResourceArrayFree(&(owner->filearr));
 	ResourceArrayFree(&(owner->dsmarr));
 	ResourceArrayFree(&(owner->jitarr));
+	ResourceArrayFree(&(owner->wesarr));
 
 	pfree(owner);
 }
@@ -1370,3 +1385,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle)
 		elog(ERROR, "JIT context %p is not owned by resource owner %s",
 			 DatumGetPointer(handle), owner->name);
 }
+
+/*
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+	ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+	ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+	/*
+	 * XXXX: There's no property to show as an identier of a wait event set,
+	 * use its pointer instead.
+	 */
+	if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+		elog(ERROR, "wait event set %p is not owned by resource owner %s",
+			 events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+	/*
+	 * XXXX: There's no property to show as an identier of a wait event set,
+	 * use its pointer instead.
+	 */
+	elog(WARNING, "wait event set leak: %p still referenced",
+		 events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 46ae56cae3..b1b8375768 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
 #define LATCH_H
 
 #include <signal.h>
+#include "utils/resowner.h"
 
 /*
  * Latch structure should be treated as opaque and only accessed through
@@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch);
 extern void SetLatch(Latch *latch);
 extern void ResetLatch(Latch *latch);
 
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+										ResourceOwner res, int nevents);
 extern void FreeWaitEventSet(WaitEventSet *set);
 extern int	AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
 							  Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index a781a7a2aa..7d19dadd57 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
@@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner,
 extern void ResourceOwnerForgetJIT(ResourceOwner owner,
 								   Datum handle);
 
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+						 WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+					   WaitEventSet *);
+
 #endif							/* RESOWNER_PRIVATE_H */
-- 
2.18.2

>From ced3307f27f01e657499ae6ef4436efaa5e350e5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 15 May 2018 20:21:32 +0900
Subject: [PATCH v3 2/3] infrastructure for asynchronous execution

This patch add an infrastructure for asynchronous execution. As a PoC
this makes only Append capable to handle asynchronously executable
subnodes.
---
 src/backend/commands/explain.c          |  17 ++
 src/backend/executor/Makefile           |   1 +
 src/backend/executor/execAsync.c        | 152 +++++++++++
 src/backend/executor/nodeAppend.c       | 342 ++++++++++++++++++++----
 src/backend/executor/nodeForeignscan.c  |  21 ++
 src/backend/nodes/bitmapset.c           |  72 +++++
 src/backend/nodes/copyfuncs.c           |   3 +
 src/backend/nodes/outfuncs.c            |   3 +
 src/backend/nodes/readfuncs.c           |   3 +
 src/backend/optimizer/plan/createplan.c |  66 ++++-
 src/backend/postmaster/pgstat.c         |   3 +
 src/backend/postmaster/syslogger.c      |   2 +-
 src/backend/utils/adt/ruleutils.c       |   8 +-
 src/backend/utils/resowner/resowner.c   |   4 +-
 src/include/executor/execAsync.h        |  22 ++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeForeignscan.h  |   3 +
 src/include/foreign/fdwapi.h            |  11 +
 src/include/nodes/bitmapset.h           |   1 +
 src/include/nodes/execnodes.h           |  23 +-
 src/include/nodes/plannodes.h           |   9 +
 src/include/pgstat.h                    |   3 +-
 22 files changed, 705 insertions(+), 65 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index efd7201d61..708e9ed546 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -86,6 +86,7 @@ static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
 									   List *ancestors, ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 								   ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
 						  ExplainState *es);
 static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1389,6 +1390,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		}
 		if (plan->parallel_aware)
 			appendStringInfoString(es->str, "Parallel ");
+		if (plan->async_capable)
+			appendStringInfoString(es->str, "Async ");
 		appendStringInfoString(es->str, pname);
 		es->indent++;
 	}
@@ -1969,6 +1972,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Hash:
 			show_hash_info(castNode(HashState, planstate), es);
 			break;
+
+		case T_Append:
+			show_append_info(castNode(AppendState, planstate), es);
+			break;
+
 		default:
 			break;
 	}
@@ -2322,6 +2330,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 						 ancestors, es);
 }
 
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+	Append *plan = (Append *) astate->ps.plan;
+
+	if (plan->nasyncplans > 0)
+		ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
 /*
  * Show the grouping keys for an Agg node.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f990c6473a..1004647d4f 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
 	execAmi.o \
+	execAsync.o \
 	execCurrent.o \
 	execExpr.o \
 	execExprInterp.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..2b7d1877e0
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,152 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *	  Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/*
+ * ExecAsyncConfigureWait: Add wait event to the WaitEventSet if needed.
+ *
+ * If reinit is true, the caller didn't reuse existing WaitEventSet.
+ */
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+					   void *data, bool reinit)
+{
+	switch (nodeTag(node))
+	{
+	case T_ForeignScanState:
+		return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+											 wes, data, reinit);
+		break;
+	default:
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(node));
+	}
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+	int **p_refind;
+	int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+	/* arg is the address of the variable refind in ExecAsyncEventWait */
+	ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+	*mcbarg->p_refind = NULL;
+	*mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+/*
+ * ExecAsyncEventWait:
+ *
+ * Wait for async events to fire. Returns the Bitmapset of fired events.
+ */
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+	static int *refind = NULL;
+	static int refindsize = 0;
+	WaitEventSet *wes;
+	WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+	int noccurred = 0;
+	Bitmapset *fired_events = NULL;
+	int i;
+	int n;
+
+	n = bms_num_members(waitnodes);
+	wes = CreateWaitEventSet(TopTransactionContext,
+							 TopTransactionResourceOwner, n);
+	if (refindsize < n)
+	{
+		if (refindsize == 0)
+			refindsize = EVENT_BUFFER_SIZE; /* XXX */
+		while (refindsize < n)
+			refindsize *= 2;
+		if (refind)
+			refind = (int *) repalloc(refind, refindsize * sizeof(int));
+		else
+		{
+			static ExecAsync_mcbarg mcb_arg =
+				{ &refind, &refindsize };
+			static MemoryContextCallback mcb =
+				{ ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+			MemoryContext oldctxt =
+				MemoryContextSwitchTo(TopTransactionContext);
+
+			/*
+			 * refind points to a memory block in
+			 * TopTransactionContext. Register a callback to reset it.
+			 */
+			MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+			refind = (int *) palloc(refindsize * sizeof(int));
+			MemoryContextSwitchTo(oldctxt);
+		}
+	}
+
+	/* Prepare WaitEventSet for waiting on the waitnodes. */
+	n = 0;
+	for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+		 i = bms_next_member(waitnodes, i))
+	{
+		refind[i] = i;
+		if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+			n++;
+	}
+
+	/* Return immediately if no node to wait. */
+	if (n == 0)
+	{
+		FreeWaitEventSet(wes);
+		return NULL;
+	}
+
+	noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+								 EVENT_BUFFER_SIZE,
+								 WAIT_EVENT_ASYNC_WAIT);
+	FreeWaitEventSet(wes);
+	if (noccurred == 0)
+		return NULL;
+
+	for (i = 0 ; i < noccurred ; i++)
+	{
+		WaitEvent *w = &occurred_event[i];
+
+		if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+		{
+			int n = *(int*)w->user_data;
+
+			fired_events = bms_add_member(fired_events, n);
+		}
+	}
+
+	return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 88919e62fa..60c36ee048 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
 #include "miscadmin.h"
 
 /* Shared state for parallel-aware Append. */
@@ -80,6 +81,7 @@ struct ParallelAppendState
 #define INVALID_SUBPLAN_INDEX		-1
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
@@ -103,22 +105,22 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	PlanState **appendplanstates;
 	Bitmapset  *validsubplans;
 	int			nplans;
+	int			nasyncplans;
 	int			firstvalid;
 	int			i,
 				j;
 
 	/* check for unsupported flags */
-	Assert(!(eflags & EXEC_FLAG_MARK));
+	Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
 
 	/*
 	 * create new AppendState for our append node
 	 */
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
-	appendstate->ps.ExecProcNode = ExecAppend;
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
-	appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+	appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 
 	/* If run-time partition pruning is enabled, then set that up now */
 	if (node->part_prune_info != NULL)
@@ -152,11 +154,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 
 		/*
 		 * When no run-time pruning is required and there's at least one
-		 * subplan, we can fill as_valid_subplans immediately, preventing
+		 * subplan, we can fill as_valid_syncsubplans immediately, preventing
 		 * later calls to ExecFindMatchingSubPlans.
 		 */
 		if (!prunestate->do_exec_prune && nplans > 0)
-			appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+			appendstate->as_valid_syncsubplans =
+				bms_add_range(NULL, node->nasyncplans, nplans - 1);
 	}
 	else
 	{
@@ -167,8 +170,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 		 * subplans as valid; they must also all be initialized.
 		 */
 		Assert(nplans > 0);
-		appendstate->as_valid_subplans = validsubplans =
-			bms_add_range(NULL, 0, nplans - 1);
+		validsubplans =	bms_add_range(NULL, 0, nplans - 1);
+		appendstate->as_valid_syncsubplans =
+			bms_add_range(NULL, node->nasyncplans, nplans - 1);
 		appendstate->as_prune_state = NULL;
 	}
 
@@ -192,10 +196,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	 */
 	j = 0;
 	firstvalid = nplans;
+	nasyncplans = 0;
+
 	i = -1;
 	while ((i = bms_next_member(validsubplans, i)) >= 0)
 	{
 		Plan	   *initNode = (Plan *) list_nth(node->appendplans, i);
+		int			sub_eflags = eflags;
+
+		/* Let async-capable subplans run asynchronously */
+		if (i < node->nasyncplans)
+		{
+			sub_eflags |= EXEC_FLAG_ASYNC;
+			nasyncplans++;
+		}
 
 		/*
 		 * Record the lowest appendplans index which is a valid partial plan.
@@ -203,13 +217,46 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 		if (i >= node->first_partial_plan && j < firstvalid)
 			firstvalid = j;
 
-		appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+		appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
 	}
 
 	appendstate->as_first_partial_plan = firstvalid;
 	appendstate->appendplans = appendplanstates;
 	appendstate->as_nplans = nplans;
 
+	/* fill in async stuff */
+	appendstate->as_nasyncplans = nasyncplans;
+	appendstate->as_syncdone = (nasyncplans == nplans);
+	appendstate->as_exec_prune = false;
+
+	/* choose appropriate version of Exec function */
+	if (appendstate->as_nasyncplans == 0)
+		appendstate->ps.ExecProcNode = ExecAppend;
+	else
+		appendstate->ps.ExecProcNode = ExecAppendAsync;
+
+	if (appendstate->as_nasyncplans)
+	{
+		appendstate->as_asyncresult = (TupleTableSlot **)
+			palloc0(appendstate->as_nasyncplans * sizeof(TupleTableSlot *));
+
+		/* initially, all async requests need a request */
+		appendstate->as_needrequest =
+			bms_add_range(NULL, 0, appendstate->as_nasyncplans - 1);
+
+		/*
+		 * ExecAppendAsync needs as_valid_syncsubplans to handle async
+		 * subnodes.
+		 */
+		if (appendstate->as_prune_state != NULL &&
+			appendstate->as_prune_state->do_exec_prune)
+		{
+			Assert(appendstate->as_valid_syncsubplans == NULL);
+
+			appendstate->as_exec_prune = true;
+		}
+	}
+
 	/*
 	 * Miscellaneous initialization
 	 */
@@ -233,7 +280,7 @@ ExecAppend(PlanState *pstate)
 {
 	AppendState *node = castNode(AppendState, pstate);
 
-	if (node->as_whichplan < 0)
+	if (node->as_whichsyncplan < 0)
 	{
 		/* Nothing to do if there are no subplans */
 		if (node->as_nplans == 0)
@@ -243,11 +290,13 @@ ExecAppend(PlanState *pstate)
 		 * If no subplan has been chosen, we must choose one before
 		 * proceeding.
 		 */
-		if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+		if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
 			!node->choose_next_subplan(node))
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	}
 
+	Assert(node->as_nasyncplans == 0);
+
 	for (;;)
 	{
 		PlanState  *subnode;
@@ -258,8 +307,9 @@ ExecAppend(PlanState *pstate)
 		/*
 		 * figure out which subplan we are currently processing
 		 */
-		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
-		subnode = node->appendplans[node->as_whichplan];
+		Assert(node->as_whichsyncplan >= 0 &&
+			   node->as_whichsyncplan < node->as_nplans);
+		subnode = node->appendplans[node->as_whichsyncplan];
 
 		/*
 		 * get a tuple from the subplan
@@ -282,6 +332,172 @@ ExecAppend(PlanState *pstate)
 	}
 }
 
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+	AppendState *node = castNode(AppendState, pstate);
+	Bitmapset *needrequest;
+	int	i;
+
+	Assert(node->as_nasyncplans > 0);
+
+restart:
+	if (node->as_nasyncresult > 0)
+	{
+		--node->as_nasyncresult;
+		return node->as_asyncresult[node->as_nasyncresult];
+	}
+
+	if (node->as_exec_prune)
+	{
+		Bitmapset *valid_subplans =
+			ExecFindMatchingSubPlans(node->as_prune_state);
+
+		/* Distribute valid subplans into sync and async */
+		node->as_needrequest =
+			bms_intersect(node->as_needrequest, valid_subplans);
+		node->as_valid_syncsubplans =
+			bms_difference(valid_subplans, node->as_needrequest);
+
+		node->as_exec_prune = false;
+	}
+
+	needrequest = node->as_needrequest;
+	node->as_needrequest = NULL;
+	while ((i = bms_first_member(needrequest)) >= 0)
+	{
+		TupleTableSlot *slot;
+		PlanState *subnode = node->appendplans[i];
+
+		slot = ExecProcNode(subnode);
+		if (subnode->asyncstate == AS_AVAILABLE)
+		{
+			if (!TupIsNull(slot))
+			{
+				node->as_asyncresult[node->as_nasyncresult++] = slot;
+				node->as_needrequest = bms_add_member(node->as_needrequest, i);
+			}
+		}
+		else
+			node->as_pending_async = bms_add_member(node->as_pending_async, i);
+	}
+	bms_free(needrequest);
+
+	for (;;)
+	{
+		TupleTableSlot *result;
+
+		/* return now if a result is available */
+		if (node->as_nasyncresult > 0)
+		{
+			--node->as_nasyncresult;
+			return node->as_asyncresult[node->as_nasyncresult];
+		}
+
+		while (!bms_is_empty(node->as_pending_async))
+		{
+			/* Don't wait for async nodes if any sync node exists. */
+			long timeout = node->as_syncdone ? -1 : 0;
+			Bitmapset *fired;
+			int i;
+
+			fired = ExecAsyncEventWait(node->appendplans,
+									   node->as_pending_async,
+									   timeout);
+
+			if (bms_is_empty(fired) && node->as_syncdone)
+			{
+				/*
+				 * We come here when all the subnodes had fired before
+				 * waiting. Retry fetching from the nodes.
+				 */
+				node->as_needrequest = node->as_pending_async;
+				node->as_pending_async = NULL;
+				goto restart;
+			}
+
+			while ((i = bms_first_member(fired)) >= 0)
+			{
+				TupleTableSlot *slot;
+				PlanState *subnode = node->appendplans[i];
+				slot = ExecProcNode(subnode);
+
+				Assert(subnode->asyncstate == AS_AVAILABLE);
+
+				if (!TupIsNull(slot))
+				{
+					node->as_asyncresult[node->as_nasyncresult++] = slot;
+					node->as_needrequest =
+						bms_add_member(node->as_needrequest, i);
+				}
+
+				node->as_pending_async =
+					bms_del_member(node->as_pending_async, i);
+			}
+			bms_free(fired);
+
+			/* return now if a result is available */
+			if (node->as_nasyncresult > 0)
+			{
+				--node->as_nasyncresult;
+				return node->as_asyncresult[node->as_nasyncresult];
+			}
+
+			if (!node->as_syncdone)
+				break;
+		}
+
+		/*
+		 * If there is no asynchronous activity still pending and the
+		 * synchronous activity is also complete, we're totally done scanning
+		 * this node.  Otherwise, we're done with the asynchronous stuff but
+		 * must continue scanning the synchronous children.
+		 */
+
+		if (!node->as_syncdone &&
+			node->as_whichsyncplan == INVALID_SUBPLAN_INDEX)
+			node->as_syncdone = !node->choose_next_subplan(node);
+
+		if (node->as_syncdone)
+		{
+			Assert(bms_is_empty(node->as_pending_async));
+			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		}
+
+		/*
+		 * get a tuple from the subplan
+		 */
+		result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+		if (!TupIsNull(result))
+		{
+			/*
+			 * If the subplan gave us something then return it as-is. We do
+			 * NOT make use of the result slot that was set up in
+			 * ExecInitAppend; there's no need for it.
+			 */
+			return result;
+		}
+
+		/*
+		 * Go on to the "next" subplan. If no more subplans, return the empty
+		 * slot set up for us by ExecInitAppend, unless there are async plans
+		 * we have yet to finish.
+		 */
+		if (!node->choose_next_subplan(node))
+		{
+			node->as_syncdone = true;
+			if (bms_is_empty(node->as_pending_async))
+			{
+				Assert(bms_is_empty(node->as_needrequest));
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			}
+		}
+
+		/* Else loop back and try to get a tuple from the new subplan */
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecEndAppend
  *
@@ -324,10 +540,18 @@ ExecReScanAppend(AppendState *node)
 		bms_overlap(node->ps.chgParam,
 					node->as_prune_state->execparamids))
 	{
-		bms_free(node->as_valid_subplans);
-		node->as_valid_subplans = NULL;
+		bms_free(node->as_valid_syncsubplans);
+		node->as_valid_syncsubplans = NULL;
 	}
 
+	/* Reset async state. */
+	for (i = 0; i < node->as_nasyncplans; ++i)
+		ExecShutdownNode(node->appendplans[i]);
+
+	node->as_nasyncresult = 0;
+	node->as_needrequest = bms_add_range(NULL, 0, node->as_nasyncplans - 1);
+	node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
 	for (i = 0; i < node->as_nplans; i++)
 	{
 		PlanState  *subnode = node->appendplans[i];
@@ -348,7 +572,7 @@ ExecReScanAppend(AppendState *node)
 	}
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
-	node->as_whichplan = INVALID_SUBPLAN_INDEX;
+	node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 }
 
 /* ----------------------------------------------------------------
@@ -436,7 +660,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 static bool
 choose_next_subplan_locally(AppendState *node)
 {
-	int			whichplan = node->as_whichplan;
+	int			whichplan = node->as_whichsyncplan;
 	int			nextplan;
 
 	/* We should never be called when there are no subplans */
@@ -451,10 +675,18 @@ choose_next_subplan_locally(AppendState *node)
 	 */
 	if (whichplan == INVALID_SUBPLAN_INDEX)
 	{
-		if (node->as_valid_subplans == NULL)
-			node->as_valid_subplans =
+		/* Shouldn't have an active async node */
+		Assert(bms_is_empty(node->as_needrequest));
+
+		if (node->as_valid_syncsubplans == NULL)
+			node->as_valid_syncsubplans =
 				ExecFindMatchingSubPlans(node->as_prune_state);
 
+		/* Exclude async plans */
+		if (node->as_nasyncplans > 0)
+			bms_del_range(node->as_valid_syncsubplans,
+						  0, node->as_nasyncplans - 1);
+
 		whichplan = -1;
 	}
 
@@ -462,14 +694,14 @@ choose_next_subplan_locally(AppendState *node)
 	Assert(whichplan >= -1 && whichplan <= node->as_nplans);
 
 	if (ScanDirectionIsForward(node->ps.state->es_direction))
-		nextplan = bms_next_member(node->as_valid_subplans, whichplan);
+		nextplan = bms_next_member(node->as_valid_syncsubplans, whichplan);
 	else
-		nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
+		nextplan = bms_prev_member(node->as_valid_syncsubplans, whichplan);
 
 	if (nextplan < 0)
 		return false;
 
-	node->as_whichplan = nextplan;
+	node->as_whichsyncplan = nextplan;
 
 	return true;
 }
@@ -490,29 +722,29 @@ choose_next_subplan_for_leader(AppendState *node)
 	/* Backward scan is not supported by parallel-aware plans */
 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
-	/* We should never be called when there are no subplans */
-	Assert(node->as_nplans > 0);
+	/* We should never be called when there are no sync subplans */
+	Assert(node->as_nplans > node->as_nasyncplans);
 
 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
-	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+	if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
 	{
 		/* Mark just-completed subplan as finished. */
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 	}
 	else
 	{
 		/* Start with last subplan. */
-		node->as_whichplan = node->as_nplans - 1;
+		node->as_whichsyncplan = node->as_nplans - 1;
 
 		/*
 		 * If we've yet to determine the valid subplans then do so now.  If
 		 * run-time pruning is disabled then the valid subplans will always be
 		 * set to all subplans.
 		 */
-		if (node->as_valid_subplans == NULL)
+		if (node->as_valid_syncsubplans == NULL)
 		{
-			node->as_valid_subplans =
+			node->as_valid_syncsubplans =
 				ExecFindMatchingSubPlans(node->as_prune_state);
 
 			/*
@@ -524,26 +756,26 @@ choose_next_subplan_for_leader(AppendState *node)
 	}
 
 	/* Loop until we find a subplan to execute. */
-	while (pstate->pa_finished[node->as_whichplan])
+	while (pstate->pa_finished[node->as_whichsyncplan])
 	{
-		if (node->as_whichplan == 0)
+		if (node->as_whichsyncplan == 0)
 		{
 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
-			node->as_whichplan = INVALID_SUBPLAN_INDEX;
+			node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 			LWLockRelease(&pstate->pa_lock);
 			return false;
 		}
 
 		/*
-		 * We needn't pay attention to as_valid_subplans here as all invalid
+		 * We needn't pay attention to as_valid_syncsubplans here as all invalid
 		 * plans have been marked as finished.
 		 */
-		node->as_whichplan--;
+		node->as_whichsyncplan--;
 	}
 
 	/* If non-partial, immediately mark as finished. */
-	if (node->as_whichplan < node->as_first_partial_plan)
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+	if (node->as_whichsyncplan < node->as_first_partial_plan)
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
 	LWLockRelease(&pstate->pa_lock);
 
@@ -571,23 +803,23 @@ choose_next_subplan_for_worker(AppendState *node)
 	/* Backward scan is not supported by parallel-aware plans */
 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
-	/* We should never be called when there are no subplans */
-	Assert(node->as_nplans > 0);
+	/* We should never be called when there are no sync subplans */
+	Assert(node->as_nplans > node->as_nasyncplans);
 
 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
 	/* Mark just-completed subplan as finished. */
-	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+	if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
 	/*
 	 * If we've yet to determine the valid subplans then do so now.  If
 	 * run-time pruning is disabled then the valid subplans will always be set
 	 * to all subplans.
 	 */
-	else if (node->as_valid_subplans == NULL)
+	else if (node->as_valid_syncsubplans == NULL)
 	{
-		node->as_valid_subplans =
+		node->as_valid_syncsubplans =
 			ExecFindMatchingSubPlans(node->as_prune_state);
 		mark_invalid_subplans_as_finished(node);
 	}
@@ -600,30 +832,30 @@ choose_next_subplan_for_worker(AppendState *node)
 	}
 
 	/* Save the plan from which we are starting the search. */
-	node->as_whichplan = pstate->pa_next_plan;
+	node->as_whichsyncplan = pstate->pa_next_plan;
 
 	/* Loop until we find a valid subplan to execute. */
 	while (pstate->pa_finished[pstate->pa_next_plan])
 	{
 		int			nextplan;
 
-		nextplan = bms_next_member(node->as_valid_subplans,
+		nextplan = bms_next_member(node->as_valid_syncsubplans,
 								   pstate->pa_next_plan);
 		if (nextplan >= 0)
 		{
 			/* Advance to the next valid plan. */
 			pstate->pa_next_plan = nextplan;
 		}
-		else if (node->as_whichplan > node->as_first_partial_plan)
+		else if (node->as_whichsyncplan > node->as_first_partial_plan)
 		{
 			/*
 			 * Try looping back to the first valid partial plan, if there is
 			 * one.  If there isn't, arrange to bail out below.
 			 */
-			nextplan = bms_next_member(node->as_valid_subplans,
+			nextplan = bms_next_member(node->as_valid_syncsubplans,
 									   node->as_first_partial_plan - 1);
 			pstate->pa_next_plan =
-				nextplan < 0 ? node->as_whichplan : nextplan;
+				nextplan < 0 ? node->as_whichsyncplan : nextplan;
 		}
 		else
 		{
@@ -631,10 +863,10 @@ choose_next_subplan_for_worker(AppendState *node)
 			 * At last plan, and either there are no partial plans or we've
 			 * tried them all.  Arrange to bail out.
 			 */
-			pstate->pa_next_plan = node->as_whichplan;
+			pstate->pa_next_plan = node->as_whichsyncplan;
 		}
 
-		if (pstate->pa_next_plan == node->as_whichplan)
+		if (pstate->pa_next_plan == node->as_whichsyncplan)
 		{
 			/* We've tried everything! */
 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -644,8 +876,8 @@ choose_next_subplan_for_worker(AppendState *node)
 	}
 
 	/* Pick the plan we found, and advance pa_next_plan one more time. */
-	node->as_whichplan = pstate->pa_next_plan;
-	pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
+	node->as_whichsyncplan = pstate->pa_next_plan;
+	pstate->pa_next_plan = bms_next_member(node->as_valid_syncsubplans,
 										   pstate->pa_next_plan);
 
 	/*
@@ -654,7 +886,7 @@ choose_next_subplan_for_worker(AppendState *node)
 	 */
 	if (pstate->pa_next_plan < 0)
 	{
-		int			nextplan = bms_next_member(node->as_valid_subplans,
+		int			nextplan = bms_next_member(node->as_valid_syncsubplans,
 											   node->as_first_partial_plan - 1);
 
 		if (nextplan >= 0)
@@ -671,8 +903,8 @@ choose_next_subplan_for_worker(AppendState *node)
 	}
 
 	/* If non-partial, immediately mark as finished. */
-	if (node->as_whichplan < node->as_first_partial_plan)
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+	if (node->as_whichsyncplan < node->as_first_partial_plan)
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
 	LWLockRelease(&pstate->pa_lock);
 
@@ -699,13 +931,13 @@ mark_invalid_subplans_as_finished(AppendState *node)
 	Assert(node->as_prune_state);
 
 	/* Nothing to do if all plans are valid */
-	if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
+	if (bms_num_members(node->as_valid_syncsubplans) == node->as_nplans)
 		return;
 
 	/* Mark all non-valid plans as finished */
 	for (i = 0; i < node->as_nplans; i++)
 	{
-		if (!bms_is_member(i, node->as_valid_subplans))
+		if (!bms_is_member(i, node->as_valid_syncsubplans))
 			node->as_pstate->pa_finished[i] = true;
 	}
 }
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 513471ab9b..3bf4aaa63d 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -141,6 +141,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
 	scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+	scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+	if ((eflags & EXEC_FLAG_ASYNC) != 0)
+		scanstate->fs_async = true;
 
 	/*
 	 * Miscellaneous initialization
@@ -384,3 +388,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
 	if (fdwroutine->ShutdownForeignScan)
 		fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanConfigureWait
+ *
+ *		In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+							  void *caller_data, bool reinit)
+{
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+	return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+												 caller_data, reinit);
+}
diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c
index 2719ea45a3..05b625783b 100644
--- a/src/backend/nodes/bitmapset.c
+++ b/src/backend/nodes/bitmapset.c
@@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper)
 	return a;
 }
 
+/*
+ * bms_del_range
+ *		Delete members in the range of 'lower' to 'upper' from the set.
+ *
+ * Note this could also be done by calling bms_del_member in a loop, however,
+ * using this function will be faster when the range is large as we work at
+ * the bitmapword level rather than at bit level.
+ */
+Bitmapset *
+bms_del_range(Bitmapset *a, int lower, int upper)
+{
+	int			lwordnum,
+				lbitnum,
+				uwordnum,
+				ushiftbits,
+				wordnum;
+
+	if (lower < 0 || upper < 0)
+		elog(ERROR, "negative bitmapset member not allowed");
+	if (lower > upper)
+		elog(ERROR, "lower range must not be above upper range");
+	uwordnum = WORDNUM(upper);
+
+	if (a == NULL)
+	{
+		a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1));
+		a->nwords = uwordnum + 1;
+	}
+
+	/* ensure we have enough words to store the upper bit */
+	else if (uwordnum >= a->nwords)
+	{
+		int			oldnwords = a->nwords;
+		int			i;
+
+		a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1));
+		a->nwords = uwordnum + 1;
+		/* zero out the enlarged portion */
+		for (i = oldnwords; i < a->nwords; i++)
+			a->words[i] = 0;
+	}
+
+	wordnum = lwordnum = WORDNUM(lower);
+
+	lbitnum = BITNUM(lower);
+	ushiftbits = BITNUM(upper) + 1;
+
+	/*
+	 * Special case when lwordnum is the same as uwordnum we must perform the
+	 * upper and lower masking on the word.
+	 */
+	if (lwordnum == uwordnum)
+	{
+		a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1)
+								| (~(bitmapword) 0) << ushiftbits);
+	}
+	else
+	{
+		/* turn off lbitnum and all bits left of it */
+		a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1);
+
+		/* turn off all bits for any intermediate words */
+		while (wordnum < uwordnum)
+			a->words[wordnum++] = (bitmapword) 0;
+
+		/* turn off upper's bit and all bits right of it. */
+		a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits;
+	}
+
+	return a;
+}
+
 /*
  * bms_int_members - like bms_intersect, but left input is recycled
  */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index d8cf87e6d0..89a49e2fdc 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -121,6 +121,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
 	COPY_SCALAR_FIELD(plan_width);
 	COPY_SCALAR_FIELD(parallel_aware);
 	COPY_SCALAR_FIELD(parallel_safe);
+	COPY_SCALAR_FIELD(async_capable);
 	COPY_SCALAR_FIELD(plan_node_id);
 	COPY_NODE_FIELD(targetlist);
 	COPY_NODE_FIELD(qual);
@@ -246,6 +247,8 @@ _copyAppend(const Append *from)
 	COPY_NODE_FIELD(appendplans);
 	COPY_SCALAR_FIELD(first_partial_plan);
 	COPY_NODE_FIELD(part_prune_info);
+	COPY_SCALAR_FIELD(nasyncplans);
+	COPY_SCALAR_FIELD(referent);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index e2f177515d..d4bb44b268 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -334,6 +334,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
 	WRITE_INT_FIELD(plan_width);
 	WRITE_BOOL_FIELD(parallel_aware);
 	WRITE_BOOL_FIELD(parallel_safe);
+	WRITE_BOOL_FIELD(async_capable);
 	WRITE_INT_FIELD(plan_node_id);
 	WRITE_NODE_FIELD(targetlist);
 	WRITE_NODE_FIELD(qual);
@@ -436,6 +437,8 @@ _outAppend(StringInfo str, const Append *node)
 	WRITE_NODE_FIELD(appendplans);
 	WRITE_INT_FIELD(first_partial_plan);
 	WRITE_NODE_FIELD(part_prune_info);
+	WRITE_INT_FIELD(nasyncplans);
+	WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 42050ab719..63af7c02d8 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1572,6 +1572,7 @@ ReadCommonPlan(Plan *local_node)
 	READ_INT_FIELD(plan_width);
 	READ_BOOL_FIELD(parallel_aware);
 	READ_BOOL_FIELD(parallel_safe);
+	READ_BOOL_FIELD(async_capable);
 	READ_INT_FIELD(plan_node_id);
 	READ_NODE_FIELD(targetlist);
 	READ_NODE_FIELD(qual);
@@ -1672,6 +1673,8 @@ _readAppend(void)
 	READ_NODE_FIELD(appendplans);
 	READ_INT_FIELD(first_partial_plan);
 	READ_NODE_FIELD(part_prune_info);
+	READ_INT_FIELD(nasyncplans);
+	READ_INT_FIELD(referent);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 744eed187d..ba18dd88a8 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -300,6 +300,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
 									 List *rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
 											 GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -1082,6 +1083,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	bool		tlist_was_changed = false;
 	List	   *pathkeys = best_path->path.pathkeys;
 	List	   *subplans = NIL;
+	List	   *asyncplans = NIL;
+	List	   *syncplans = NIL;
+	List	   *asyncpaths = NIL;
+	List	   *syncpaths = NIL;
+	List	   *newsubpaths = NIL;
 	ListCell   *subpaths;
 	RelOptInfo *rel = best_path->path.parent;
 	PartitionPruneInfo *partpruneinfo = NULL;
@@ -1090,6 +1096,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	Oid		   *nodeSortOperators = NULL;
 	Oid		   *nodeCollations = NULL;
 	bool	   *nodeNullsFirst = NULL;
+	int			nasyncplans = 0;
+	bool		first = true;
+	bool		referent_is_sync = true;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -1219,9 +1228,36 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 			}
 		}
 
-		subplans = lappend(subplans, subplan);
+		/*
+		 * Classify as async-capable or not. If we have decided to run the
+		 * children in parallel, we cannot any one of them run asynchronously.
+		 */
+		if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+		{
+			subplan->async_capable = true;
+			asyncplans = lappend(asyncplans, subplan);
+			asyncpaths = lappend(asyncpaths, subpath);
+			++nasyncplans;
+			if (first)
+				referent_is_sync = false;
+		}
+		else
+		{
+			syncplans = lappend(syncplans, subplan);
+			syncpaths = lappend(syncpaths, subpath);
+		}
+
+		first = false;
 	}
 
+	/*
+	 * subplan contains asyncplans in the first half, if any, and sync plans in
+	 * another half, if any.  We need that the same for subpaths to make
+	 * partition pruning information in sync with subplans.
+	 */
+	subplans = list_concat(asyncplans, syncplans);
+	newsubpaths = list_concat(asyncpaths, syncpaths);
+
 	/*
 	 * If any quals exist, they may be useful to perform further partition
 	 * pruning during execution.  Gather information needed by the executor to
@@ -1249,7 +1285,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 		if (prunequal != NIL)
 			partpruneinfo =
 				make_partition_pruneinfo(root, rel,
-										 best_path->subpaths,
+										 newsubpaths,
 										 best_path->partitioned_rels,
 										 prunequal);
 	}
@@ -1257,6 +1293,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	plan->appendplans = subplans;
 	plan->first_partial_plan = best_path->first_partial_path;
 	plan->part_prune_info = partpruneinfo;
+	plan->nasyncplans = nasyncplans;
+	plan->referent = referent_is_sync ? nasyncplans : 0;
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -7016,3 +7054,27 @@ is_projection_capable_plan(Plan *plan)
 	}
 	return true;
 }
+
+/*
+ * is_projection_capable_path
+ *		Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+	switch (nodeTag(path))
+	{
+		case T_ForeignPath:
+			{
+				FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+				Assert(fdwroutine != NULL);
+				if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+					fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+					return true;
+			}
+		default:
+			break;
+	}
+	return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d7f99d9944..79a2562454 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3882,6 +3882,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_XACT_GROUP_UPDATE:
 			event_name = "XactGroupUpdate";
 			break;
+		case WAIT_EVENT_ASYNC_WAIT:
+			event_name = "AsyncExecWait";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index ffcb54968f..a4de6d90e2 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -300,7 +300,7 @@ SysLoggerMain(int argc, char *argv[])
 	 * syslog pipe, which implies that all other backends have exited
 	 * (including the postmaster).
 	 */
-	wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+	wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2);
 	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 #ifndef WIN32
 	AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 076c3c019f..f7b5587d7f 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4584,10 +4584,14 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan)
 	 * tlists according to one of the children, and the first one is the most
 	 * natural choice.  Likewise special-case ModifyTable to pretend that the
 	 * first child plan is the OUTER referent; this is to support RETURNING
-	 * lists containing references to non-target relations.
+	 * lists containing references to non-target relations. For Append, use the
+	 * explicitly specified referent.
 	 */
 	if (IsA(plan, Append))
-		dpns->outer_plan = linitial(((Append *) plan)->appendplans);
+	{
+		Append *app = (Append *) plan;
+		dpns->outer_plan = list_nth(app->appendplans, app->referent);
+	}
 	else if (IsA(plan, MergeAppend))
 		dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans);
 	else if (IsA(plan, ModifyTable))
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 237ca9fa30..27742a1641 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -1416,7 +1416,7 @@ void
 ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
 {
 	/*
-	 * XXXX: There's no property to show as an identier of a wait event set,
+	 * XXXX: There's no property to show as an identifier of a wait event set,
 	 * use its pointer instead.
 	 */
 	if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
@@ -1431,7 +1431,7 @@ static void
 PrintWESLeakWarning(WaitEventSet *events)
 {
 	/*
-	 * XXXX: There's no property to show as an identier of a wait event set,
+	 * XXXX: There's no property to show as an identifier of a wait event set,
 	 * use its pointer instead.
 	 */
 	elog(WARNING, "wait event set leak: %p still referenced",
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..3b6bf4a516
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,22 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ *		Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+								   void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+									 long timeout);
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index c7deeac662..aca9e2bddd 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -59,6 +59,7 @@
 #define EXEC_FLAG_MARK			0x0008	/* need mark/restore */
 #define EXEC_FLAG_SKIP_TRIGGERS 0x0010	/* skip AfterTrigger calls */
 #define EXEC_FLAG_WITH_NO_DATA	0x0020	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0040	/* request async execution */
 
 
 /* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 326d713ebf..71a233b41f 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 											ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+										  WaitEventSet *wes,
+										  void *caller_data, bool reinit);
 
 #endif							/* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..853ba2b5ad 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
 															List *fdw_private,
 															RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+													WaitEventSet *wes,
+													void *caller_data,
+													bool reinit);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -190,6 +195,7 @@ typedef struct FdwRoutine
 	GetForeignPlan_function GetForeignPlan;
 	BeginForeignScan_function BeginForeignScan;
 	IterateForeignScan_function IterateForeignScan;
+	IterateForeignScan_function IterateForeignScanAsync;
 	ReScanForeignScan_function ReScanForeignScan;
 	EndForeignScan_function EndForeignScan;
 
@@ -242,6 +248,11 @@ typedef struct FdwRoutine
 	InitializeDSMForeignScan_function InitializeDSMForeignScan;
 	ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
 	InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+	/* Support functions for asynchronous execution */
+	IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
 	ShutdownForeignScan_function ShutdownForeignScan;
 
 	/* Support functions for path reparameterization. */
diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h
index d113c271ee..177e6218cb 100644
--- a/src/include/nodes/bitmapset.h
+++ b/src/include/nodes/bitmapset.h
@@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b);
+extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b);
 
 /* support for iterating through the integer elements of a set: */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 98e0072b8a..cd50494c74 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -938,6 +938,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
  * abstract superclass for all PlanState-type nodes.
  * ----------------
  */
+typedef enum AsyncState
+{
+	AS_AVAILABLE,
+	AS_WAITING
+} AsyncState;
+
 typedef struct PlanState
 {
 	NodeTag		type;
@@ -1026,6 +1032,11 @@ typedef struct PlanState
 	bool		outeropsset;
 	bool		inneropsset;
 	bool		resultopsset;
+
+	/* Async subnode execution stuff */
+	AsyncState	asyncstate;
+
+	int32		padding;			/* to keep alignment of derived types */
 } PlanState;
 
 /* ----------------
@@ -1221,14 +1232,21 @@ struct AppendState
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
 	int			as_nplans;
-	int			as_whichplan;
+	int			as_whichsyncplan; /* which sync plan is being executed  */
 	int			as_first_partial_plan;	/* Index of 'appendplans' containing
 										 * the first partial plan */
+	int			as_nasyncplans;	/* # of async-capable children */
 	ParallelAppendState *as_pstate; /* parallel coordination info */
 	Size		pstate_len;		/* size of parallel coordination info */
 	struct PartitionPruneState *as_prune_state;
-	Bitmapset  *as_valid_subplans;
+	Bitmapset  *as_valid_syncsubplans;
 	bool		(*choose_next_subplan) (AppendState *);
+	bool		as_syncdone;	/* all synchronous plans done? */
+	Bitmapset  *as_needrequest;	/* async plans needing a new request */
+	Bitmapset  *as_pending_async;	/* pending async plans */
+	TupleTableSlot **as_asyncresult;	/* results of each async plan */
+	int			as_nasyncresult;	/* # of valid entries in as_asyncresult */
+	bool		as_exec_prune;	/* runtime pruning needed for async exec? */
 };
 
 /* ----------------
@@ -1796,6 +1814,7 @@ typedef struct ForeignScanState
 	Size		pscan_len;		/* size of parallel coordination information */
 	/* use struct pointer to avoid including fdwapi.h here */
 	struct FdwRoutine *fdwroutine;
+	bool		fs_async;
 	void	   *fdw_state;		/* foreign-data wrapper can keep state here */
 } ForeignScanState;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 83e01074ed..abad89b327 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -135,6 +135,11 @@ typedef struct Plan
 	bool		parallel_aware; /* engage parallel-aware logic? */
 	bool		parallel_safe;	/* OK to use as part of parallel plan? */
 
+	/*
+	 * information needed for asynchronous execution
+	 */
+	bool		async_capable;  /* engage asynchronous execution logic? */
+
 	/*
 	 * Common structural data for all Plan types.
 	 */
@@ -262,6 +267,10 @@ typedef struct Append
 
 	/* Info for run-time subplan pruning; NULL if we're not doing that */
 	struct PartitionPruneInfo *part_prune_info;
+
+	/* Async child node execution stuff */
+	int			nasyncplans;	/* # async subplans, always at start of list */
+	int			referent; 		/* index of inheritance tree referent */
 } Append;
 
 /* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index c55dc1481c..2259910637 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -887,7 +887,8 @@ typedef enum
 	WAIT_EVENT_REPLICATION_SLOT_DROP,
 	WAIT_EVENT_SAFE_SNAPSHOT,
 	WAIT_EVENT_SYNC_REP,
-	WAIT_EVENT_XACT_GROUP_UPDATE
+	WAIT_EVENT_XACT_GROUP_UPDATE,
+	WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
-- 
2.18.2

>From 4c207a7901f0a9d05aacb5ce46a7f1daa83ce474 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH v3 3/3] async postgres_fdw

---
 contrib/postgres_fdw/connection.c             |  28 +
 .../postgres_fdw/expected/postgres_fdw.out    | 222 ++++---
 contrib/postgres_fdw/postgres_fdw.c           | 603 +++++++++++++++---
 contrib/postgres_fdw/postgres_fdw.h           |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  20 +-
 5 files changed, 694 insertions(+), 181 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 52d1fe3563..d9edc5e4de 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
 	bool		invalidated;	/* true if reconnect is pending */
 	uint32		server_hashvalue;	/* hash value of foreign server OID */
 	uint32		mapping_hashvalue;	/* hash value of user mapping OID */
+	void		*storage;		/* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -202,6 +203,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 			 entry->conn, server->servername, user->umid, user->userid);
+		entry->storage = NULL;
 	}
 
 	/*
@@ -215,6 +217,32 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	return entry->conn;
 }
 
+/*
+ * Returns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+	bool		found;
+	ConnCacheEntry *entry;
+	ConnCacheKey key;
+
+	/* Find storage using the same key with GetConnection */
+	key = user->umid;
+	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+	Assert(found);
+
+	/* Create one if not yet. */
+	if (entry->storage == NULL)
+	{
+		entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+		memset(entry->storage, 0, initsize);
+	}
+
+	return entry->storage;
+}
+
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 82fc1290ef..29aa09db8e 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6973,7 +6973,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
 INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |  aa   
 ----------+-------
  a        | aaa
@@ -7001,7 +7001,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -7029,7 +7029,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -7057,7 +7057,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | newtoo
@@ -7127,35 +7127,41 @@ insert into bar2 values(3,33,33);
 insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+                                                   QUERY PLAN                                                    
+-----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar bar_1
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+                     Sort Key: bar_1.f1
+                     ->  Seq Scan on public.bar bar_1
+                           Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
                ->  Foreign Scan on public.bar2 bar_2
                      Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo foo_1
-                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo foo_1
+                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
+(29 rows)
 
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
  f1 | f2 
 ----+----
   1 | 11
@@ -7165,35 +7171,41 @@ select * from bar where f1 in (select f1 from foo) for update;
 (4 rows)
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+                                                   QUERY PLAN                                                   
+----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar bar_1
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+                     Sort Key: bar_1.f1
+                     ->  Seq Scan on public.bar bar_1
+                           Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
                ->  Foreign Scan on public.bar2 bar_2
                      Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo foo_1
-                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo foo_1
+                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
+(29 rows)
 
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
  f1 | f2 
 ----+----
   1 | 11
@@ -7223,11 +7235,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo foo_1
-                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo foo_1
+                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
    ->  Hash Join
          Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid
          Inner Unique: true
@@ -7241,12 +7254,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo foo_1
-                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(39 rows)
+                           ->  Seq Scan on public.foo foo_1
+                                 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
+(41 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
 select tableoid::regclass, * from bar order by 1,2;
@@ -7276,16 +7290,17 @@ where bar.f1 = ss.f1;
          Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
          Hash Cond: (foo.f1 = bar.f1)
          ->  Append
+               Async subplans: 2 
+               ->  Async Foreign Scan on public.foo2 foo_1
+                     Output: ROW(foo_1.f1), foo_1.f1
+                     Remote SQL: SELECT f1 FROM public.loct1
+               ->  Async Foreign Scan on public.foo2 foo_3
+                     Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
+                     Remote SQL: SELECT f1 FROM public.loct1
                ->  Seq Scan on public.foo
                      Output: ROW(foo.f1), foo.f1
-               ->  Foreign Scan on public.foo2 foo_1
-                     Output: ROW(foo_1.f1), foo_1.f1
-                     Remote SQL: SELECT f1 FROM public.loct1
                ->  Seq Scan on public.foo foo_2
                      Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-               ->  Foreign Scan on public.foo2 foo_3
-                     Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
-                     Remote SQL: SELECT f1 FROM public.loct1
          ->  Hash
                Output: bar.f1, bar.f2, bar.ctid
                ->  Seq Scan on public.bar
@@ -7303,17 +7318,18 @@ where bar.f1 = ss.f1;
                Output: (ROW(foo.f1)), foo.f1
                Sort Key: foo.f1
                ->  Append
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on public.foo2 foo_1
+                           Output: ROW(foo_1.f1), foo_1.f1
+                           Remote SQL: SELECT f1 FROM public.loct1
+                     ->  Async Foreign Scan on public.foo2 foo_3
+                           Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
+                           Remote SQL: SELECT f1 FROM public.loct1
                      ->  Seq Scan on public.foo
                            Output: ROW(foo.f1), foo.f1
-                     ->  Foreign Scan on public.foo2 foo_1
-                           Output: ROW(foo_1.f1), foo_1.f1
-                           Remote SQL: SELECT f1 FROM public.loct1
                      ->  Seq Scan on public.foo foo_2
                            Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-                     ->  Foreign Scan on public.foo2 foo_3
-                           Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
-                           Remote SQL: SELECT f1 FROM public.loct1
-(45 rows)
+(47 rows)
 
 update bar set f2 = f2 + 100
 from
@@ -7463,27 +7479,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2 bar_1
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2 bar_1
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: u.f1, u.f2
+   Sort Key: u.f1
+   CTE u
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2 bar_1
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2 bar_1
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on u
+         Output: u.f1, u.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
  f1 | f2  
 ----+-----
   1 | 311
   2 | 322
-  6 | 266
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
@@ -8558,11 +8580,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
  Sort
    Sort Key: t1.a, t3.c
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)) INNER JOIN (ftprt1_p1 t3_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)) INNER JOIN (ftprt1_p2 t3_2)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE t1.a % 25 =0 ORDER BY 1,2,3;
   a  |  b  |  c   
@@ -8597,20 +8620,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
 -- with whole-row reference; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
-                       QUERY PLAN                       
---------------------------------------------------------
+                          QUERY PLAN                          
+--------------------------------------------------------------
  Sort
    Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
    ->  Hash Full Join
          Hash Cond: (t1.a = t2.b)
          ->  Append
-               ->  Foreign Scan on ftprt1_p1 t1_1
-               ->  Foreign Scan on ftprt1_p2 t1_2
+               Async subplans: 2 
+               ->  Async Foreign Scan on ftprt1_p1 t1_1
+               ->  Async Foreign Scan on ftprt1_p2 t1_2
          ->  Hash
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2_1
-                     ->  Foreign Scan on ftprt2_p2 t2_2
-(11 rows)
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on ftprt2_p1 t2_1
+                     ->  Async Foreign Scan on ftprt2_p2 t2_2
+(13 rows)
 
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
        wr       |       wr       
@@ -8639,11 +8664,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
  Sort
    Sort Key: t1.a, t1.b
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: (ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE t1.a%25 = 0 ORDER BY 1,2;
   a  |  b  
@@ -8696,21 +8722,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
 -- test FOR UPDATE; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                             QUERY PLAN                             
+--------------------------------------------------------------------
  LockRows
    ->  Sort
          Sort Key: t1.a
          ->  Hash Join
                Hash Cond: (t2.b = t1.a)
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2_1
-                     ->  Foreign Scan on ftprt2_p2 t2_2
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on ftprt2_p1 t2_1
+                     ->  Async Foreign Scan on ftprt2_p2 t2_2
                ->  Hash
                      ->  Append
-                           ->  Foreign Scan on ftprt1_p1 t1_1
-                           ->  Foreign Scan on ftprt1_p2 t1_2
-(12 rows)
+                           Async subplans: 2 
+                           ->  Async Foreign Scan on ftprt1_p1 t1_1
+                           ->  Async Foreign Scan on ftprt1_p2 t1_2
+(14 rows)
 
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
   a  |  b  
@@ -8745,18 +8773,19 @@ ANALYZE fpagg_tab_p3;
 SET enable_partitionwise_aggregate TO false;
 EXPLAIN (COSTS OFF)
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
-                        QUERY PLAN                         
------------------------------------------------------------
+                           QUERY PLAN                            
+-----------------------------------------------------------------
  Sort
    Sort Key: pagg_tab.a
    ->  HashAggregate
          Group Key: pagg_tab.a
          Filter: (avg(pagg_tab.b) < '22'::numeric)
          ->  Append
-               ->  Foreign Scan on fpagg_tab_p1 pagg_tab_1
-               ->  Foreign Scan on fpagg_tab_p2 pagg_tab_2
-               ->  Foreign Scan on fpagg_tab_p3 pagg_tab_3
-(9 rows)
+               Async subplans: 3 
+               ->  Async Foreign Scan on fpagg_tab_p1 pagg_tab_1
+               ->  Async Foreign Scan on fpagg_tab_p2 pagg_tab_2
+               ->  Async Foreign Scan on fpagg_tab_p3 pagg_tab_3
+(10 rows)
 
 -- Plan with partitionwise aggregates is enabled
 SET enable_partitionwise_aggregate TO true;
@@ -8767,13 +8796,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
  Sort
    Sort Key: pagg_tab.a
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 3 
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
-(9 rows)
+(10 rows)
 
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
  a  | sum  | min | count 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..b04b6a0e54 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,8 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -35,6 +37,7 @@
 #include "optimizer/restrictinfo.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "postgres_fdw.h"
 #include "utils/builtins.h"
 #include "utils/float.h"
@@ -56,6 +59,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrieve PgFdwScanState struct from ForeignScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -122,11 +128,29 @@ enum FdwDirectModifyPrivateIndex
 	FdwDirectModifyPrivateSetProcessed
 };
 
+/*
+ * Connection common state - shared among all PgFdwState instances using the
+ * same connection.
+ */
+typedef struct PgFdwConnCommonState
+{
+	ForeignScanState   *leader;		/* leader node of this connection */
+	bool				busy;		/* true if this connection is busy */
+} PgFdwConnCommonState;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+	PGconn	   *conn;					/* connection for the scan */
+	PgFdwConnCommonState *commonstate;	/* connection common state */
+} PgFdwState;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table. NULL
 								 * for a foreign join scan. */
 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
@@ -137,7 +161,6 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -153,6 +176,12 @@ typedef struct PgFdwScanState
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
 	bool		eof_reached;	/* true if last fetch reached EOF */
+	bool		async;			/* true if run asynchronously */
+	bool		queued;			/* true if this node is in waiter queue */
+	ForeignScanState *waiter;	/* Next node to run a query among nodes
+								 * sharing the same connection */
+	ForeignScanState *last_waiter;	/* last element in waiter queue.
+									 * valid only on the leader node */
 
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
@@ -166,11 +195,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -197,6 +226,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
@@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
 											RangeTblEntry *target_rte,
 											Relation target_relation);
@@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
 										 RelOptInfo *input_rel,
 										 RelOptInfo *output_rel,
 										 void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+											  WaitEventSet *wes,
+											  void *caller_data, bool reinit);
 
 /*
  * Helper functions
@@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 									  EquivalenceClass *ec, EquivalenceMember *em,
 									  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   RangeTblEntry *rte,
@@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->IterateForeignScan = postgresIterateForeignScan;
 	routine->ReScanForeignScan = postgresReScanForeignScan;
 	routine->EndForeignScan = postgresEndForeignScan;
+	routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
 	/* Functions for updating foreign tables */
 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for upper relation push-down */
 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+	/* Support functions for async execution */
+	routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+	routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
 	PG_RETURN_POINTER(routine);
 }
 
@@ -1434,12 +1476,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(user, false);
+	fsstate->s.conn = GetConnection(user, false);
+	fsstate->s.commonstate = (PgFdwConnCommonState *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState));
+	fsstate->s.commonstate->leader = NULL;
+	fsstate->s.commonstate->busy = false;
+	fsstate->waiter = NULL;
+	fsstate->last_waiter = node;
 
 	/* Assign a unique ID for my cursor */
-	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+	fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
 	fsstate->cursor_exists = false;
 
+	/* Initialize async execution status */
+	fsstate->async = false;
+	fsstate->queued = false;
+
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
@@ -1487,40 +1539,244 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 							 &fsstate->param_values);
 }
 
+/*
+ * Async queue manipulation functions
+ */
+
+/*
+ * add_async_waiter:
+ *
+ * Enqueue node if it isn't in the queue. Immediately send request it if the
+ * underlying connection is not busy.
+ */
+static inline void
+add_async_waiter(ForeignScanState *node)
+{
+	PgFdwScanState   *fsstate = GetPgFdwScanState(node);
+	ForeignScanState *leader = fsstate->s.commonstate->leader;
+
+	/*
+	 * Do nothing if the node is already in the queue or already eof'ed.
+	 * Note: leader node is not marked as queued.
+	 */
+	if (leader == node || fsstate->queued || fsstate->eof_reached)
+		return;
+
+	if (leader == NULL)
+	{
+		/* no leader means not busy, send request immediately */
+		request_more_data(node);
+	}
+	else
+	{
+		/* the connection is busy, queue the node */
+		PgFdwScanState   *leader_state = GetPgFdwScanState(leader);
+		PgFdwScanState   *last_waiter_state
+			= GetPgFdwScanState(leader_state->last_waiter);
+
+		last_waiter_state->waiter = node;
+		leader_state->last_waiter = node;
+		fsstate->queued = true;
+	}
+}
+
+/*
+ * move_to_next_waiter:
+ *
+ * Make the first waiter be the next leader
+ * Returns the new leader or NULL if there's no waiter.
+ */
+static inline ForeignScanState *
+move_to_next_waiter(ForeignScanState *node)
+{
+	PgFdwScanState *leader_state = GetPgFdwScanState(node);
+	ForeignScanState *next_leader = leader_state->waiter;
+
+	Assert(leader_state->s.commonstate->leader = node);
+
+	if (next_leader)
+	{
+		/* the first waiter becomes the next leader */
+		PgFdwScanState *next_leader_state = GetPgFdwScanState(next_leader);
+		next_leader_state->last_waiter = leader_state->last_waiter;
+		next_leader_state->queued = false;
+	}
+
+	leader_state->waiter = NULL;
+	leader_state->s.commonstate->leader = next_leader;
+
+	return next_leader;
+}
+
+/*
+ * Remove the node from waiter queue.
+ *
+ * Remaining results are cleared if the node is a busy leader.
+ * This intended to be used during node shutdown.
+ */
+static inline void
+remove_async_node(ForeignScanState *node)
+{
+	PgFdwScanState		*fsstate = GetPgFdwScanState(node);
+	ForeignScanState	*leader = fsstate->s.commonstate->leader;
+	PgFdwScanState		*leader_state;
+	ForeignScanState	*prev;
+	PgFdwScanState		*prev_state;
+	ForeignScanState	*cur;
+
+	/* no need to remove me */
+	if (!leader || !fsstate->queued)
+		return;
+
+	leader_state = GetPgFdwScanState(leader);
+
+	if (leader == node)
+	{
+		/* It's the leader */
+		ForeignScanState	*next_leader;
+
+		if (leader_state->s.commonstate->busy)
+		{
+			/*
+			 * this node is waiting for result, absorb the result first so
+			 * that the following commands can be sent on the connection.
+			 */
+			PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+			PGconn *conn = leader_state->s.conn;
+
+			while(PQisBusy(conn))
+				PQclear(PQgetResult(conn));
+
+			leader_state->s.commonstate->busy = false;
+		}
+
+		move_to_next_waiter(node);
+
+		return;
+	}
+
+	/*
+	 * Just remove the node from the queue
+	 *
+	 * Nodes don't have a link to the previous node but anyway this function is
+	 * called on the shutdown path, so we don't bother seeking for faster way
+	 * to do this.
+	 */
+	prev = leader;
+	prev_state = leader_state;
+	cur =  GetPgFdwScanState(prev)->waiter;
+	while (cur)
+	{
+		PgFdwScanState *curstate = GetPgFdwScanState(cur);
+
+		if (cur == node)
+		{
+			prev_state->waiter = curstate->waiter;
+
+			/* relink to the previous node if the last node was removed */
+			if (leader_state->last_waiter == cur)
+				leader_state->last_waiter = prev;
+
+			fsstate->queued = false;
+
+			return;
+		}
+		prev = cur;
+		prev_state = curstate;
+		cur = curstate->waiter;
+	}
+}
+
 /*
  * postgresIterateForeignScan
- *		Retrieve next row from the result set, or clear tuple slot to indicate
- *		EOF.
+ *		Retrieve next row from the result set.
+ *
+ *		For synchronous nodes, returns clear tuple slot means EOF.
+ *
+ *		For asynchronous nodes, if clear tuple slot is returned, the caller
+ *		needs to check async state to tell if all tuples received
+ *		(AS_AVAILABLE) or waiting for the next data to come (AS_WAITING).
  */
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
-	/*
-	 * If this is the first call after Begin or ReScan, we need to create the
-	 * cursor on the remote side.
-	 */
-	if (!fsstate->cursor_exists)
-		create_cursor(node);
-
-	/*
-	 * Get some more tuples, if we've run out.
-	 */
+	if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached)
+	{
+		/* we've run out, get some more tuples */
+		if (!node->fs_async)
+		{
+			/*
+			 * finish the running query before sending the next command for
+			 * this node
+			 */
+			if (!fsstate->s.commonstate->busy)
+				vacate_connection((PgFdwState *)fsstate, false);
+
+			request_more_data(node);
+
+			/* Fetch the result immediately. */
+			fetch_received_data(node);
+		}
+		else if (!fsstate->s.commonstate->busy)
+		{
+			/* If the connection is not busy, just send the request. */
+			request_more_data(node);
+		}
+		else
+		{
+			/* The connection is busy, queue the request */
+			bool available = true;
+			ForeignScanState *leader = fsstate->s.commonstate->leader;
+			PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+
+			/* queue the requested node */
+			add_async_waiter(node);
+
+			/*
+			 * The request for the next node cannot be sent before the leader
+			 * responds. Finish the current leader if possible.
+			 */
+			if (PQisBusy(leader_state->s.conn))
+			{
+				int rc = WaitLatchOrSocket(NULL,
+										   WL_SOCKET_READABLE | WL_TIMEOUT |
+										   WL_EXIT_ON_PM_DEATH,
+										   PQsocket(leader_state->s.conn), 0,
+										   WAIT_EVENT_ASYNC_WAIT);
+				if (!(rc & WL_SOCKET_READABLE))
+					available = false;
+			}
+
+			/* fetch the leader's data and enqueue it for the next request */
+			if (available)
+			{
+				fetch_received_data(leader);
+				add_async_waiter(leader);
+			}
+		}
+	}
+
 	if (fsstate->next_tuple >= fsstate->num_tuples)
 	{
-		/* No point in another fetch if we already detected EOF, though. */
-		if (!fsstate->eof_reached)
-			fetch_more_data(node);
-		/* If we didn't get any tuples, must be end of data. */
-		if (fsstate->next_tuple >= fsstate->num_tuples)
-			return ExecClearTuple(slot);
+		/*
+		 * We haven't received a result for the given node this time, return
+		 * with no tuple to give way to another node.
+		 */
+		if (fsstate->eof_reached)
+			node->ss.ps.asyncstate = AS_AVAILABLE;
+		else
+			node->ss.ps.asyncstate = AS_WAITING;
+
+		return ExecClearTuple(slot);
 	}
 
 	/*
 	 * Return the next tuple.
 	 */
+	node->ss.ps.asyncstate = AS_AVAILABLE;
 	ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
 					   slot,
 					   false);
@@ -1535,7 +1791,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	char		sql[64];
 	PGresult   *res;
 
@@ -1543,6 +1799,8 @@ postgresReScanForeignScan(ForeignScanState *node)
 	if (!fsstate->cursor_exists)
 		return;
 
+	vacate_connection((PgFdwState *)fsstate, true);
+
 	/*
 	 * If any internal parameters affecting this node have changed, we'd
 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1571,9 +1829,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->s.conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+		pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
 	PQclear(res);
 
 	/* Now force a fresh FETCH. */
@@ -1591,7 +1849,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
 	if (fsstate == NULL)
@@ -1599,15 +1857,31 @@ postgresEndForeignScan(ForeignScanState *node)
 
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
-		close_cursor(fsstate->conn, fsstate->cursor_number);
+		close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
 	/* Release remote connection */
-	ReleaseConnection(fsstate->conn);
-	fsstate->conn = NULL;
+	ReleaseConnection(fsstate->s.conn);
+	fsstate->s.conn = NULL;
 
 	/* MemoryContexts will be deleted automatically. */
 }
 
+/*
+ * postgresShutdownForeignScan
+ *		Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+	if (plan->operation != CMD_SELECT)
+		return;
+
+	/* remove the node from waiting queue */
+	remove_async_node(node);
+}
+
 /*
  * postgresAddForeignUpdateTargets
  *		Add resjunk column(s) needed for update/delete on a foreign table
@@ -2372,7 +2646,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	dmstate->conn = GetConnection(user, false);
+	dmstate->s.conn = GetConnection(user, false);
+	dmstate->s.commonstate = (PgFdwConnCommonState *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState));
 
 	/* Update the foreign-join-related fields. */
 	if (fsplan->scan.scanrelid == 0)
@@ -2457,7 +2733,11 @@ postgresIterateDirectModify(ForeignScanState *node)
 	 * If this is the first call after Begin, execute the statement.
 	 */
 	if (dmstate->num_tuples == -1)
+	{
+		/* finish running query to send my command */
+		vacate_connection((PgFdwState *)dmstate, true);
 		execute_dml_stmt(node);
+	}
 
 	/*
 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2504,8 +2784,8 @@ postgresEndDirectModify(ForeignScanState *node)
 		PQclear(dmstate->result);
 
 	/* Release remote connection */
-	ReleaseConnection(dmstate->conn);
-	dmstate->conn = NULL;
+	ReleaseConnection(dmstate->s.conn);
+	dmstate->s.conn = NULL;
 
 	/* MemoryContext will be deleted automatically. */
 }
@@ -2703,6 +2983,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_param_join_conds;
 		StringInfoData sql;
 		PGconn	   *conn;
+		PgFdwConnCommonState *commonstate;
 		Selectivity local_sel;
 		QualCost	local_cost;
 		List	   *fdw_scan_tlist = NIL;
@@ -2747,6 +3028,18 @@ estimate_path_cost_size(PlannerInfo *root,
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
+		commonstate = GetConnectionSpecificStorage(fpinfo->user,
+												sizeof(PgFdwConnCommonState));
+		if (commonstate)
+		{
+			PgFdwState tmpstate;
+			tmpstate.conn = conn;
+			tmpstate.commonstate = commonstate;
+
+			/* finish running query to send my command */
+			vacate_connection(&tmpstate, true);
+		}
+
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
@@ -3317,11 +3610,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PGconn	   *conn = fsstate->s.conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -3384,50 +3677,120 @@ create_cursor(ForeignScanState *node)
 }
 
 /*
- * Fetch some more rows from the node's cursor.
+ * Sends the next request of the node. If the given node is different from the
+ * current connection leader, pushes it back to waiter queue and let the given
+ * node be the leader.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	ForeignScanState *leader = fsstate->s.commonstate->leader;
+	PGconn	   *conn = fsstate->s.conn;
+	char		sql[64];
+
+	/* must be non-busy */
+	Assert(!fsstate->s.commonstate->busy);
+	/* must be not-eof'ed */
+	Assert(!fsstate->eof_reached);
+
+	/*
+	 * If this is the first call after Begin or ReScan, we need to create the
+	 * cursor on the remote side.
+	 */
+	if (!fsstate->cursor_exists)
+		create_cursor(node);
+
+	snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+			 fsstate->fetch_size, fsstate->cursor_number);
+
+	if (!PQsendQuery(conn, sql))
+		pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+	fsstate->s.commonstate->busy = true;
+
+	/* The node is the current leader, just return. */
+	if (leader == node)
+		return;
+
+	/* Let the node be the leader */
+	if (leader != NULL)
+	{
+		remove_async_node(node);
+		fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter;
+		fsstate->waiter = leader;
+	}
+	else
+	{
+		fsstate->last_waiter = node;
+		fsstate->waiter = NULL;
+	}
+
+	fsstate->s.commonstate->leader = node;
+}
+
+/*
+ * Fetches received data and automatically send requests of the next waiter.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
+	ForeignScanState *waiter;
+
+	/* I should be the current connection leader */
+	Assert(fsstate->s.commonstate->leader == node);
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch if no tuple is remaining
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		fsstate->tuples = NULL;
+		fsstate->num_tuples = 0;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
+	else if (fsstate->next_tuple > 0)
+	{
+		/* There's some remains. Move them to the beginning of the store */
+		int n = 0;
+
+		while(fsstate->next_tuple < fsstate->num_tuples)
+			fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+		fsstate->num_tuples = n;
+	}
+
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PGconn	   *conn = fsstate->s.conn;
 		char		sql[64];
-		int			numrows;
+		int			addrows;
+		size_t		newsize;
 		int			i;
 
-		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fsstate->fetch_size, fsstate->cursor_number);
-
-		res = pgfdw_exec_query(conn, sql);
-		/* On error, report the original query, not the FETCH. */
+		res = pgfdw_get_result(conn, fsstate->query);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+		addrows = PQntuples(res);
+		newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+		if (fsstate->tuples)
+			fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+		else
+			fsstate->tuples = (HeapTuple *) palloc(newsize);
 
-		for (i = 0; i < numrows; i++)
+		for (i = 0; i < addrows; i++)
 		{
 			Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-			fsstate->tuples[i] =
+			fsstate->tuples[fsstate->num_tuples + i] =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
@@ -3437,22 +3800,73 @@ fetch_more_data(ForeignScanState *node)
 		}
 
 		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
+		if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
 			fsstate->fetch_ct_2++;
 
+		fsstate->next_tuple = 0;
+		fsstate->num_tuples += addrows;
+
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fsstate->fetch_size);
+		fsstate->eof_reached = (addrows < fsstate->fetch_size);
 	}
 	PG_FINALLY();
 	{
+		fsstate->s.commonstate->busy = false;
+
 		if (res)
 			PQclear(res);
 	}
 	PG_END_TRY();
 
+	/* let the first waiter be the next leader of this connection */
+	waiter = move_to_next_waiter(node);
+
+	/* send the next request if any */
+	if (waiter)
+		request_more_data(waiter);
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Vacate the underlying connection so that this node can send the next query.
+ */
+static void
+vacate_connection(PgFdwState *fdwstate, bool clear_queue)
+{
+	PgFdwConnCommonState *commonstate = fdwstate->commonstate;
+	ForeignScanState *leader;
+
+	Assert(commonstate != NULL);
+
+	/* just return if the connection is already available */
+	if (commonstate->leader == NULL || !commonstate->busy)
+		return;
+
+	/*
+	 * let the current connection leader read all of the result for the running
+	 * query
+	 */
+	leader = commonstate->leader;
+	fetch_received_data(leader);
+
+	/* let the first waiter be the next leader of this connection */
+	move_to_next_waiter(leader);
+
+	if (!clear_queue)
+		return;
+
+	/* Clear the waiting list */
+	while (leader)
+	{
+		PgFdwScanState *fsstate = GetPgFdwScanState(leader);
+
+		fsstate->last_waiter = NULL;
+		leader = fsstate->waiter;
+		fsstate->waiter = NULL;
+	}
+}
+
 /*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
@@ -3566,7 +3980,9 @@ create_foreign_modify(EState *estate,
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(user, true);
+	fmstate->s.conn = GetConnection(user, true);
+	fmstate->s.commonstate = (PgFdwConnCommonState *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnCommonState));
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Set up remote query information. */
@@ -3653,6 +4069,9 @@ execute_foreign_modify(EState *estate,
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
+	/* finish running query to send my command */
+	vacate_connection((PgFdwState *)fmstate, true);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -3680,14 +4099,14 @@ execute_foreign_modify(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3695,10 +4114,10 @@ execute_foreign_modify(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -3734,7 +4153,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
 	/* Construct name we'll use for the prepared statement. */
 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
-			 GetPrepStmtNumber(fmstate->conn));
+			 GetPrepStmtNumber(fmstate->s.conn));
 	p_name = pstrdup(prep_name);
 
 	/*
@@ -3744,12 +4163,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * the prepared statements we use in this module are simple enough that
 	 * the remote server will make the right choices.
 	 */
-	if (!PQsendPrepare(fmstate->conn,
+	if (!PQsendPrepare(fmstate->s.conn,
 					   p_name,
 					   fmstate->query,
 					   0,
 					   NULL))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3757,9 +4176,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 	PQclear(res);
 
 	/* This action shows that the prepare has been done. */
@@ -3888,16 +4307,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->s.conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+			pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
 		PQclear(res);
 		fmstate->p_name = NULL;
 	}
 
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
-	fmstate->conn = NULL;
+	ReleaseConnection(fmstate->s.conn);
+	fmstate->s.conn = NULL;
 }
 
 /*
@@ -4056,9 +4475,9 @@ execute_dml_stmt(ForeignScanState *node)
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
 	 */
-	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+	if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
 						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+		pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -4066,10 +4485,10 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+		pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
 						   dmstate->query);
 
 	/* Get the number of rows affected. */
@@ -5560,6 +5979,40 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
 	/* XXX Consider parameterized paths for the join relation */
 }
 
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+	return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add wait event so that the ForeignScan node is going to wait for.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+								  void *caller_data, bool reinit)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+	/* Reinit is not supported for now. */
+	Assert(reinit);
+
+	if (fsstate->s.commonstate->leader == node)
+	{
+		AddWaitEventToSet(wes,
+						  WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+						  NULL, caller_data);
+		return true;
+	}
+
+	return false;
+}
+
+
 /*
  * Assess whether the aggregation, grouping and having operations can be pushed
  * down to the foreign server.  As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..96af75a33e 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo
 	UserMapping *user;			/* only set in use_remote_estimate mode */
 
 	int			fetch_size;		/* fetch size for this remote table */
+	bool		allow_prefetch;	/* true to allow overlapped fetching  */
 
 	/*
 	 * Name of the relation, for use while EXPLAINing ForeignScan.  It is used
@@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83971665e3..359208a12a 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1780,25 +1780,25 @@ INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE b SET aa = 'new';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'newtoo';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
@@ -1840,12 +1840,12 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
@@ -1904,8 +1904,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 -- Test that UPDATE/DELETE with inherited target works with row-level triggers
 CREATE TRIGGER trig_row_before
-- 
2.18.2

Reply via email to