Hello, this is the new version which is complete to some extent
of parallelism based on postgres_fdw.

This compares the costs for parallel and non-parallel execution
and choose parallel one if it is faster by some extent specified
by GUCs. The attached files are,

   - PostgreSQL body stuff for parallel execution planning.

   - postgres_fdw parallelization.

   - error avoidig stuff for file_fdw (not necessary for this patch)

   - simple test script to try this patch.


 - planner stuff to handle cost of parallel execution. Including
   indication of parallel execution.

 - GUCs to control how easy to go parallel.

   parallel_cost_threshold is the threshold of path total cost
   where to enable parallel execution.

   prallel_ratio_threshond is the threshold of the ratio of
   parallel cost to non-parallel cost where to choose the
   parallel path.

 - postgres_fdw which can run in multiple sessions using snapshot
   export and fetches in parallel for foreign scans on dedicated

   foreign server has a new option 'max_aux_connections', which
   limits the number of connections for parallel execution per
   (server, user) pairs.

 - change file_fdw to follow the changes of planner stuff.

Whth the patch attached, the attached sql script shows the
following result (after some line breaks are added).

postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c
           FROM fvs1 a join fvs1_2 b on (a.a = b.a);
                                QUERY PLAN
Hash Join  (cost=9573392.96..9573393.34 rows=1 width=40 parallel)
           (actual time=2213.400..2213.407 rows=12 loops=1)
 Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a
           (cost=9573392.96..9573393.29 rows=10 width=8 parallel)
           (actual time=2199.992..2199.993 rows=10 loops=1)
 ->  Hash  (cost=9573393.29..9573393.29 rows=10 width=36)
           (actual time=13.388..13.388 rows=10 loops=1)
       Buckets: 1024  Batches: 1  Memory Usage: 6kB
       ->  Foreign Scan on fvs1_2 b 
                   (cost=9573392.96..9573393.29 rows=10 width=36 parallel)
                   (actual time=13.376..13.379 rows=10 loops=1)
 Planning time: 4.761 ms
 Execution time: 2227.462 ms
(8 rows)
postgres=# SET parallel_ratio_threshold to 0.0;
postgres=# EXPLAIN ANALYZE SELECT a.a, a.b, b.c
           FROM fvs1 a join fvs1 b on (a.a = b.a);
                                QUERY PLAN
 Hash Join  (cost=318084.32..318084.69 rows=1 width=40)
            (actual time=4302.913..4302.928 rows=12 loops=1)
   Hash Cond: (a.a = b.a)
   ->  Foreign Scan on fvs1 a  (cost=159041.93..159042.26 rows=10 width=8)
                               (actual time=2122.989..2122.992 rows=10 loops=1)
   ->  Hash  (cost=159042.26..159042.26 rows=10 width=500)
             (actual time=2179.900..2179.900 rows=10 loops=1)
         Buckets: 1024  Batches: 1  Memory Usage: 6kB
         ->  Foreign Scan on fvs1 b
                   (cost=159041.93..159042.26 rows=10 width=500)
                   (actual time=2179.856..2179.864 rows=10 loops=1)
 Planning time: 5.085 ms
 Execution time: 4303.728 ms
(8 rows)

Where, "parallel" indicates that the node includes nodes run in
parallel. The latter EXPLAIN shows the result when parallel
execution is inhibited.

Since the lack of time, sorry that the details for this patch is
comming later.

Is there any suggestions or opinions?


Kyotaro Horiguchi
NTT Open Source Software Center

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..d810c3c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1168,16 +1168,18 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		if (es->format == EXPLAIN_FORMAT_TEXT)
-			appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d)",
+			appendStringInfo(es->str, "  (cost=%.2f..%.2f rows=%.0f width=%d%s)",
 							 plan->startup_cost, plan->total_cost,
-							 plan->plan_rows, plan->plan_width);
+							 plan->plan_rows, plan->plan_width,
+							 plan->parallel_start ? " parallel" : "");
 			ExplainPropertyFloat("Startup Cost", plan->startup_cost, 2, es);
 			ExplainPropertyFloat("Total Cost", plan->total_cost, 2, es);
 			ExplainPropertyFloat("Plan Rows", plan->plan_rows, 0, es);
-			ExplainPropertyInteger("Plan Width", plan->plan_width, es);
+			ExplainPropertyText("Parallel Exec",
+								plan->parallel_start ? "true" : "false", es);
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c81efe9..85d78b4 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -53,6 +53,8 @@ typedef struct pushdown_safety_info
 /* These parameters are set by GUC */
 bool		enable_geqo = false;	/* just in case GUC doesn't set it */
 int			geqo_threshold;
+double		parallel_cost_threshold = 10000;
+double		parallel_ratio_threshold = 0.6;
 /* Hook for plugins to replace standard_join_search() */
 join_search_hook_type join_search_hook = NULL;
@@ -112,6 +114,104 @@ static void recurse_push_qual(Node *setOp, Query *topquery,
 				  RangeTblEntry *rte, Index rti, Node *qual);
 static void remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel);
+ * choose_parallel_walker -- Walk and make whole path tree decide whichever
+ * doing parallel execution or not.
+ */
+void choose_parallel_walker(Path *path, bool run_parallel)
+	ListCell *lc;
+	List *subpaths = NIL;
+	Path *left = NULL, *right = NULL;
+	switch (nodeTag(path))
+	{
+	case T_Path:
+	case T_IndexPath:
+	case T_BitmapHeapPath:
+	case T_TidPath:
+	case T_BitmapAndPath:
+	case T_BitmapOrPath:
+	case T_ResultPath:
+		/* These paths is imparallelizable so far, return immediately. */
+		return;
+	case T_ForeignPath: /**/
+		break;
+	case T_NestPath: /**/
+		left  = ((NestPath*)path)->outerjoinpath;
+		right = ((NestPath*)path)->innerjoinpath;
+		break;
+	case T_MergePath: /**/
+		left  = ((MergePath*)path)->jpath.outerjoinpath;
+		right = ((MergePath*)path)->jpath.innerjoinpath;
+		break;
+	case T_HashPath: /**/
+		left  = ((HashPath*)path)->jpath.outerjoinpath;
+		right = ((HashPath*)path)->jpath.innerjoinpath;
+		break;
+	case T_AppendPath: /**/
+		subpaths = ((AppendPath*)path)->subpaths;
+		break;
+	case T_MergeAppendPath: /**/
+		subpaths = ((MergeAppendPath*)path)->subpaths;
+		break;
+	case T_MaterialPath: /**/
+		left = ((MaterialPath*)path)->subpath;
+	case T_UniquePath: /**/
+		left = ((UniquePath*)path)->subpath;
+		break;
+	default:
+		elog(ERROR, "unrecognized path node type: %d",
+			 (int) nodeTag(path));
+		break;
+	}
+	/* Make this node can be treated ignoreing parallel costs hereafter. */
+	if (run_parallel)
+	{
+		path->startup_cost = path->pstartup_cost;
+		path->total_cost = path->ptotal_cost;
+	}
+	else
+	{
+		path->parallel = false;
+	}
+	/* Walk into lower level */
+	if (left)  choose_parallel_walker(left, run_parallel);
+	if (right) choose_parallel_walker(right, run_parallel);
+	foreach (lc, subpaths)
+	{
+		choose_parallel_walker((Path*) lfirst(lc), run_parallel);
+	}
+ * choose_parallel_walker --- Decide whether this relation should run in
+ * parallel or not.
+ * 
+ * The relation has path tree having costs for both parallel and non-parallel
+ * execution and we can decide only by looking into the top node of the path
+ * tree to decide whether to do parallel or not. Then make the whole tree to
+ * be handled ignoring parallel execution stuff.
+ */
+choose_parallel_scans(RelOptInfo *rel)
+	ListCell *lc;
+	foreach (lc, rel->pathlist)
+	{
+		Path *path = (Path*) lfirst(lc);
+		bool run_parallel = (path->parallel &&
+				 path->total_cost > parallel_cost_threshold &&
+				 path->ptotal_cost < path->total_cost * parallel_ratio_threshold);
+		choose_parallel_walker(path, run_parallel);
+	}
  * make_one_rel
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 0cdb790..1f5bcd8 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1334,6 +1334,74 @@ cost_sort(Path *path, PlannerInfo *root,
+ * cost_append
+ *	  Determines and returns the cost of a Append node.
+ *
+ * We charge nothing extra for the Append itself, which perhaps is too
+ * optimistic, but since it doesn't do any selection or projection, it is a
+ * pretty cheap node.  If you change this, see also make_append().
+ */
+cost_append(Path *path, List *subpaths)
+	double max_par_startup = 0.0;
+	double par_running = 0.0;
+	double nonpar_startup = -1.0;
+	double nonpar_running = 0.0;
+	ListCell *lc;
+	path->rows = 0;
+	path->startup_cost = 0;
+	path->total_cost = 0;
+	foreach(lc, subpaths)
+	{
+		Path	   *subpath = (Path *) lfirst(lc);
+		path->rows += subpath->rows;
+		if (lc == list_head(subpaths)) /* first node? */
+		{
+			path->startup_cost = subpath->startup_cost;
+			path->parallel = subpath->parallel;
+		}
+		path->total_cost += subpath->total_cost;
+	}
+	if (! path->parallel) return;
+	/* calculate cost for parallel execution */
+	foreach (lc, subpaths)
+	{
+		Path *subpath = (Path*)lfirst (lc);
+		if (subpath->parallel)
+		{
+			if (max_par_startup < subpath->pstartup_cost)
+				max_par_startup = subpath->pstartup_cost;
+			par_running += subpath->ptotal_cost - subpath->pstartup_cost;
+		}
+		else
+		{
+			if (nonpar_startup < 0)
+			{
+				nonpar_startup = subpath->startup_cost;
+				nonpar_running = subpath->total_cost - subpath->startup_cost;
+			}
+			else
+			{
+				nonpar_running += subpath->total_cost;
+			}
+		}
+	}
+	path->pstartup_cost = (max_par_startup < nonpar_startup ?
+						 nonpar_startup : max_par_startup);
+	path->ptotal_cost = path->pstartup_cost +
+		(par_running < nonpar_running ? nonpar_running : par_running);
  * cost_merge_append
  *	  Determines and returns the cost of a MergeAppend node.
@@ -1356,12 +1424,17 @@ cost_sort(Path *path, PlannerInfo *root,
  * 'n_streams' is the number of input streams
  * 'input_startup_cost' is the sum of the input streams' startup costs
  * 'input_total_cost' is the sum of the input streams' total costs
+ * 'parallel_estimate' is whether to estimate for parallel execution
+ * 'input_pstart_cost' is the sum of the input streams' parallel startup costs
+ * 'input_ptotal_cost' is the sum of the input streams' parallel total costs
  * 'tuples' is the number of tuples in all the streams
 cost_merge_append(Path *path, PlannerInfo *root,
 				  List *pathkeys, int n_streams,
 				  Cost input_startup_cost, Cost input_total_cost,
+				  bool parallel_estimate,
+				  Cost input_pstartup_cost, Cost input_ptotal_cost,
 				  double tuples)
 	Cost		startup_cost = 0;
@@ -1395,6 +1468,13 @@ cost_merge_append(Path *path, PlannerInfo *root,
 	path->startup_cost = startup_cost + input_startup_cost;
 	path->total_cost = startup_cost + run_cost + input_total_cost;
+	if (parallel_estimate)
+	{
+		path->pstartup_cost = startup_cost + input_pstartup_cost;
+		path->ptotal_cost = startup_cost + run_cost + input_ptotal_cost;
+		path->parallel = true;
+	}
@@ -1648,6 +1728,54 @@ cost_group(Path *path, PlannerInfo *root,
 	path->total_cost = total_cost;
+ * Set the parallel costs for the workspace as non-parallel node.
+ */
+static void
+clear_cost_parallel(JoinCostWorkspace *workspace)
+	workspace->pstartup_cost = workspace->startup_cost;
+	workspace->ptotal_cost = workspace->total_cost;
+	workspace->parallel = false;
+ * Set the initial cost numbers of parallel node for join estimation.
+ * This should be called only if the path node can executed in parallel.
+ */
+static void
+initial_cost_parallel(JoinCostWorkspace *workspace,
+					  Cost outer_pstartup_cost,
+					  Cost inner_pstartup_cost)
+	double pstartup =
+		(outer_pstartup_cost > inner_pstartup_cost ?
+		 outer_pstartup_cost : inner_pstartup_cost);
+	workspace->ptotal_cost = 
+		workspace->total_cost -	(workspace->startup_cost - pstartup);
+	workspace->pstartup_cost = pstartup;
+	workspace->parallel = true;
+ * Set the final cost numbers of parallel node for join estimation.
+ * This does nothing if this node won't be executed in parallel.
+ */
+static void
+final_cost_parallel(Path *path, JoinCostWorkspace *workspace,
+					Cost startup_cost, Cost run_cost)
+	if (workspace->parallel)
+	{
+		path->pstartup_cost =
+			startup_cost + workspace->pstartup_cost;
+		path->ptotal_cost =
+			path->pstartup_cost + run_cost;
+		path->parallel = true;
+	}
  * initial_cost_nestloop
  *	  Preliminary estimate of the cost of a nestloop join path.
@@ -1765,6 +1893,14 @@ initial_cost_nestloop(PlannerInfo *root, JoinCostWorkspace *workspace,
 	/* Save private data for final_cost_nestloop */
 	workspace->run_cost = run_cost;
 	workspace->inner_rescan_run_cost = inner_rescan_run_cost;
+	clear_cost_parallel(workspace);
+	if (outer_path->parallel)
+	{
+		initial_cost_parallel(workspace,
+							  outer_path->pstartup_cost,
+							  inner_path->pstartup_cost);
+	}
@@ -1786,7 +1922,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path,
 	Path	   *inner_path = path->innerjoinpath;
 	double		outer_path_rows = outer_path->rows;
 	double		inner_path_rows = inner_path->rows;
-	Cost		startup_cost = workspace->startup_cost;
+	Cost		startup_cost = 0.0;
 	Cost		run_cost = workspace->run_cost;
 	Cost		inner_rescan_run_cost = workspace->inner_rescan_run_cost;
 	Cost		cpu_per_tuple;
@@ -1861,8 +1997,10 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path,
 	cpu_per_tuple = cpu_tuple_cost + restrict_qual_cost.per_tuple;
 	run_cost += cpu_per_tuple * ntuples;
-	path->path.startup_cost = startup_cost;
-	path->path.total_cost = startup_cost + run_cost;
+	path->path.startup_cost = startup_cost + workspace->startup_cost;
+	path->path.total_cost = startup_cost + workspace->startup_cost + run_cost;
+	final_cost_parallel(&path->path, workspace, startup_cost, run_cost);
@@ -1917,6 +2055,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	Path		sort_path;		/* dummy for result of cost_sort */
+	double 		outer_startup_cost,
+				inner_startup_cost;
 	/* Protect some assumptions below that rowcounts aren't zero or NaN */
 	if (outer_path_rows <= 0 || isnan(outer_path_rows))
@@ -2035,6 +2175,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
+		outer_startup_cost = sort_path.startup_cost;
 		startup_cost += sort_path.startup_cost;
 		startup_cost += (sort_path.total_cost - sort_path.startup_cost)
 			* outerstartsel;
@@ -2043,6 +2184,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
+		outer_startup_cost = outer_path->startup_cost;
 		startup_cost += outer_path->startup_cost;
 		startup_cost += (outer_path->total_cost - outer_path->startup_cost)
 			* outerstartsel;
@@ -2061,6 +2203,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
+		inner_startup_cost = sort_path.startup_cost;
 		startup_cost += sort_path.startup_cost;
 		startup_cost += (sort_path.total_cost - sort_path.startup_cost)
 			* innerstartsel;
@@ -2069,6 +2212,7 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
+		inner_startup_cost = inner_path->startup_cost;
 		startup_cost += inner_path->startup_cost;
 		startup_cost += (inner_path->total_cost - inner_path->startup_cost)
 			* innerstartsel;
@@ -2096,6 +2240,16 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	workspace->inner_rows = inner_rows;
 	workspace->outer_skip_rows = outer_skip_rows;
 	workspace->inner_skip_rows = inner_skip_rows;
+	clear_cost_parallel(workspace);
+	if (outer_path->parallel)
+	{
+		initial_cost_parallel(workspace, 
+							  outer_startup_cost - outer_path->total_cost + 
+							  outer_path->ptotal_cost,
+							  inner_startup_cost - inner_path->total_cost + 
+							  inner_path->ptotal_cost);
+	}
@@ -2128,7 +2282,7 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,
 	double		inner_path_rows = inner_path->rows;
 	List	   *mergeclauses = path->path_mergeclauses;
 	List	   *innersortkeys = path->innersortkeys;
-	Cost		startup_cost = workspace->startup_cost;
+	Cost		startup_cost = 0;
 	Cost		run_cost = workspace->run_cost;
 	Cost		inner_run_cost = workspace->inner_run_cost;
 	double		outer_rows = workspace->outer_rows;
@@ -2320,8 +2474,10 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,
 	cpu_per_tuple = cpu_tuple_cost + qp_qual_cost.per_tuple;
 	run_cost += cpu_per_tuple * mergejointuples;
-	path->jpath.path.startup_cost = startup_cost;
-	path->jpath.path.total_cost = startup_cost + run_cost;
+	path->jpath.path.startup_cost = startup_cost + workspace->startup_cost;
+	path->jpath.path.total_cost = path->jpath.path.startup_cost + run_cost;
+	final_cost_parallel(&path->jpath.path, workspace, startup_cost, run_cost);
@@ -2485,6 +2641,17 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	workspace->run_cost = run_cost;
 	workspace->numbuckets = numbuckets;
 	workspace->numbatches = numbatches;
+	/* costs for parallel execution */
+	clear_cost_parallel(workspace);
+	/* Any side is executed first in hash join... */
+	if (inner_path->parallel || outer_path->parallel)
+	{
+		initial_cost_parallel(workspace,
+							  outer_path->pstartup_cost,
+							  inner_path->pstartup_cost);
+	}
@@ -2510,7 +2677,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,
 	double		outer_path_rows = outer_path->rows;
 	double		inner_path_rows = inner_path->rows;
 	List	   *hashclauses = path->path_hashclauses;
-	Cost		startup_cost = workspace->startup_cost;
+	Cost		startup_cost = 0;
 	Cost		run_cost = workspace->run_cost;
 	int			numbuckets = workspace->numbuckets;
 	int			numbatches = workspace->numbatches;
@@ -2700,8 +2867,10 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,
 	cpu_per_tuple = cpu_tuple_cost + qp_qual_cost.per_tuple;
 	run_cost += cpu_per_tuple * hashjointuples;
-	path->jpath.path.startup_cost = startup_cost;
-	path->jpath.path.total_cost = startup_cost + run_cost;
+	path->jpath.path.startup_cost = startup_cost + workspace->startup_cost;
+	path->jpath.path.total_cost = path->jpath.path.startup_cost + run_cost;
+	final_cost_parallel(&path->jpath.path, workspace, startup_cost, run_cost);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 4b641a2..67d801a 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -3163,6 +3163,9 @@ copy_path_costsize(Plan *dest, Path *src)
 		dest->startup_cost = src->startup_cost;
 		dest->total_cost = src->total_cost;
+		dest->parallel_start = src->parallel;
+		dest->pstartup_cost = src->pstartup_cost;
+		dest->ptotal_cost = src->ptotal_cost;
 		dest->plan_rows = src->rows;
 		dest->plan_width = src->parent->width;
@@ -3170,6 +3173,9 @@ copy_path_costsize(Plan *dest, Path *src)
 		dest->startup_cost = 0;
 		dest->total_cost = 0;
+		dest->parallel_start = false;
+		dest->pstartup_cost = 0;
+		dest->ptotal_cost = 0;
 		dest->plan_rows = 0;
 		dest->plan_width = 0;
@@ -3462,6 +3468,7 @@ ForeignScan *
 make_foreignscan(List *qptlist,
 				 List *qpqual,
 				 Index scanrelid,
+				 bool  parallel,
 				 List *fdw_exprs,
 				 List *fdw_private)
@@ -3474,6 +3481,7 @@ make_foreignscan(List *qptlist,
 	plan->lefttree = NULL;
 	plan->righttree = NULL;
 	node->scan.scanrelid = scanrelid;
+	node->scan.parallel = parallel;
 	node->fdw_exprs = fdw_exprs;
 	node->fdw_private = fdw_private;
 	/* fsSystemCol will be filled in by create_foreignscan_plan */
@@ -3489,6 +3497,10 @@ make_append(List *appendplans, List *tlist)
 	Plan	   *plan = &node->plan;
 	double		total_size;
 	ListCell   *subnode;
+	double      max_par_startup = 0;
+	double      par_running = 0;
+	double      nonpar_startup = 0;
+	double      nonpar_running = 0;
 	 * Compute cost as sum of subplan costs.  We charge nothing extra for the
@@ -3509,12 +3521,27 @@ make_append(List *appendplans, List *tlist)
 		Plan	   *subplan = (Plan *) lfirst(subnode);
-		if (subnode == list_head(appendplans))	/* first node? */
-			plan->startup_cost = subplan->startup_cost;
-		plan->total_cost += subplan->total_cost;
+		if (subplan->parallel_start)
+		{
+			if (max_par_startup < subplan->startup_cost)
+				max_par_startup = subplan->startup_cost;
+			par_running += subplan->total_cost - subplan->startup_cost;
+		}
+		else
+		{
+			nonpar_startup += subplan->startup_cost;
+			nonpar_running += (subplan->total_cost - subplan->startup_cost);
+		}
 		plan->plan_rows += subplan->plan_rows;
 		total_size += subplan->plan_width * subplan->plan_rows;
+	plan->startup_cost = 
+		(max_par_startup < nonpar_startup ? nonpar_startup : max_par_startup);
+	plan->total_cost = 
+		plan->startup_cost + par_running + nonpar_running;
 	if (plan->plan_rows > 0)
 		plan->plan_width = rint(total_size / plan->plan_rows);
diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c
index 93484a0..9d75dd7 100644
--- a/src/backend/optimizer/plan/planmain.c
+++ b/src/backend/optimizer/plan/planmain.c
@@ -240,5 +240,8 @@ query_planner(PlannerInfo *root, List *tlist,
 		final_rel->cheapest_total_path->param_info != NULL)
 		elog(ERROR, "failed to construct the join relation");
+	/* Decide each parallelizable path should be run in parallel */
+	choose_parallel_scans(final_rel);
 	return final_rel;
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 319e8b2..d716dcb 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -900,27 +900,11 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
 										 * unsorted */
 	pathnode->subpaths = subpaths;
-	/*
-	 * We don't bother with inventing a cost_append(), but just do it here.
-	 *
-	 * Compute rows and costs as sums of subplan rows and costs.  We charge
-	 * nothing extra for the Append itself, which perhaps is too optimistic,
-	 * but since it doesn't do any selection or projection, it is a pretty
-	 * cheap node.  If you change this, see also make_append().
-	 */
-	pathnode->path.rows = 0;
-	pathnode->path.startup_cost = 0;
-	pathnode->path.total_cost = 0;
+	cost_append(&pathnode->path, subpaths);
 	foreach(l, subpaths)
-		Path	   *subpath = (Path *) lfirst(l);
-		pathnode->path.rows += subpath->rows;
-		if (l == list_head(subpaths))	/* first node? */
-			pathnode->path.startup_cost = subpath->startup_cost;
-		pathnode->path.total_cost += subpath->total_cost;
+		Path *subpath = (Path*)lfirst(l);
 		/* All child paths must have same parameterization */
 		Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));
@@ -943,7 +927,14 @@ create_merge_append_path(PlannerInfo *root,
 	MergeAppendPath *pathnode = makeNode(MergeAppendPath);
 	Cost		input_startup_cost;
 	Cost		input_total_cost;
+	Cost		input_pstartup_cost;
+	Cost		input_ptotal_cost;
 	ListCell   *l;
+	Cost		max_par_startup = 0.0;
+	Cost		par_running = 0.0;
+	Cost		nonpar_startup = 0.0;
+	Cost 		nonpar_running = 0.0;
+	bool		parallel_exists = false;
 	pathnode->path.pathtype = T_MergeAppend;
 	pathnode->path.parent = rel;
@@ -970,14 +961,16 @@ create_merge_append_path(PlannerInfo *root,
 	foreach(l, subpaths)
 		Path	   *subpath = (Path *) lfirst(l);
+		double additional_startup_cost = 0;
+		double additional_total_cost = 0;
 		pathnode->path.rows += subpath->rows;
 		if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
 			/* Subpath is adequately ordered, we won't need to sort it */
-			input_startup_cost += subpath->startup_cost;
-			input_total_cost += subpath->total_cost;
+			additional_startup_cost = subpath->startup_cost;
+			additional_total_cost = subpath->total_cost;
@@ -993,18 +986,55 @@ create_merge_append_path(PlannerInfo *root,
-			input_startup_cost += sort_path.startup_cost;
-			input_total_cost += sort_path.total_cost;
+			additional_startup_cost = sort_path.startup_cost;
+			additional_total_cost = sort_path.total_cost;
+		}
+		input_startup_cost += additional_startup_cost;
+		input_total_cost += additional_total_cost;
+		/* cost calculation for parallel execution  */
+		if (subpath->parallel)
+		{
+			Cost thiscost =
+				subpath->pstartup_cost + additional_startup_cost;
+			if (max_par_startup < thiscost)
+				max_par_startup = thiscost;
+			par_running +=
+				(subpath->ptotal_cost - subpath->pstartup_cost) +
+				(additional_total_cost - additional_startup_cost) -
+				(subpath->total_cost - subpath->startup_cost);
+			parallel_exists = true;
+		}
+		else
+		{
+			nonpar_startup +=
+				subpath->startup_cost + additional_startup_cost;
+			nonpar_running +=
+				additional_total_cost - additional_startup_cost;
 		/* All child paths must have same parameterization */
 		Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));
+	if (parallel_exists)
+	{
+		input_pstartup_cost =
+			(max_par_startup < nonpar_startup ?
+			 nonpar_startup : max_par_startup);
+		input_ptotal_cost = 
+			input_pstartup_cost +
+			(par_running < nonpar_running ? nonpar_running : par_running);
+	}
 	/* Now we can compute total costs of the MergeAppend */
 	cost_merge_append(&pathnode->path, root,
 					  pathkeys, list_length(subpaths),
 					  input_startup_cost, input_total_cost,
+					  parallel_exists,
+					  input_pstartup_cost, input_ptotal_cost,
 	return pathnode;
@@ -1066,6 +1096,12 @@ create_material_path(RelOptInfo *rel, Path *subpath)
+	if (subpath->parallel)
+	{
+		Cost diff = subpath->pstartup_cost - subpath->startup_cost;
+		pathnode->path.pstartup_cost = pathnode->path.startup_cost + diff;
+		pathnode->path.ptotal_cost   = pathnode->path.total_cost + diff;
+	}
 	return pathnode;
@@ -1292,6 +1328,12 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		pathnode->path.total_cost = subpath->total_cost;
 		pathnode->path.pathkeys = subpath->pathkeys;
+		if (subpath->parallel)
+		{
+			pathnode->path.startup_cost = subpath->startup_cost;
+			pathnode->path.total_cost = subpath->total_cost;
+			pathnode->path.parallel = true;
+		}
 		rel->cheapest_unique_path = (Path *) pathnode;
@@ -1328,6 +1370,12 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 				pathnode->path.total_cost = subpath->total_cost;
 				pathnode->path.pathkeys = subpath->pathkeys;
+				if (subpath->parallel)
+				{
+					pathnode->path.startup_cost = subpath->startup_cost;
+					pathnode->path.total_cost = subpath->total_cost;
+					pathnode->path.parallel = true;
+				}
 				rel->cheapest_unique_path = (Path *) pathnode;
@@ -1407,6 +1455,19 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		pathnode->path.total_cost = sort_path.total_cost;
+	/* calculate parallel execution costs */
+	if (subpath->parallel)
+	{
+		pathnode->path.pstartup_cost = 
+			sort_path.startup_cost - subpath->startup_cost
+			+ subpath->pstartup_cost;
+		pathnode->path.ptotal_cost = 
+			sort_path.total_cost - subpath->total_cost
+			+ subpath->total_cost;
+		pathnode->path.parallel = true;
+	}
 	rel->cheapest_unique_path = (Path *) pathnode;
@@ -1576,9 +1637,8 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
 ForeignPath *
 create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
 						double rows, Cost startup_cost, Cost total_cost,
-						List *pathkeys,
-						Relids required_outer,
-						List *fdw_private)
+						List *pathkeys, Relids required_outer,
+						bool parallel, List *fdw_private)
 	ForeignPath *pathnode = makeNode(ForeignPath);
@@ -1589,6 +1649,9 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->path.rows = rows;
 	pathnode->path.startup_cost = startup_cost;
 	pathnode->path.total_cost = total_cost;
+	pathnode->path.parallel = parallel;
+	pathnode->path.pstartup_cost = startup_cost;
+	pathnode->path.ptotal_cost = total_cost;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->fdw_private = fdw_private;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..cf3f104 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2691,6 +2691,24 @@ static struct config_real ConfigureNamesReal[] =
 		0.5, 0.0, 1.0,
+	{
+		{"parallel_cost_threshold", PGC_USERSET, QUERY_TUNING_METHOD,
+		 	gettext_noop("Set the minimum total cost to allow parallel plans."),
+			NULL
+		},
+		&parallel_cost_threshold,
+		10000.0, 0.0, 1.0e10,	/* disable_cost */
+	},
+	{
+		{"parallel_ratio_threshold", PGC_USERSET, QUERY_TUNING_METHOD,
+		 	gettext_noop("Set threshold to select parallel plans, as the ratio of parallel cost to non-parallel cost."),
+			NULL
+		},
+		&parallel_ratio_threshold,
+		0.8, 0.0, 10.0,
+	},
 	/* End-of-list marker */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 3b9c683..f4f9deb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -95,6 +95,9 @@ typedef struct Plan
 	Cost		startup_cost;	/* cost expended before fetching any tuples */
 	Cost		total_cost;		/* total cost (assuming all tuples fetched) */
+	bool		parallel_start; /* this node will run in parallel */
+	Cost		pstartup_cost;  /* startup cost for parallel execution */
+	Cost		ptotal_cost;    /* total cost for parallel execution */
 	 * planner's estimate of result size of this plan step
@@ -264,6 +267,7 @@ typedef struct Scan
 	Plan		plan;
 	Index		scanrelid;		/* relid is index into the range table */
+	bool		parallel;		/* This scan will run in parallel */
 } Scan;
 /* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index dacbe9c..08d59aa 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -729,6 +729,10 @@ typedef struct Path
 	Cost		startup_cost;	/* cost expended before fetching any tuples */
 	Cost		total_cost;		/* total cost (assuming all tuples fetched) */
+	bool		parallel;		/* can run in parallel ? */
+	Cost		pstartup_cost;	/* startup cost for parallel execution */
+	Cost		ptotal_cost;	/* total cost for parallel execution */
 	List	   *pathkeys;		/* sort ordering of path's output */
 	/* pathkeys is a List of PathKey nodes; see above */
 } Path;
@@ -1626,6 +1630,10 @@ typedef struct JoinCostWorkspace
 	Cost		startup_cost;	/* cost expended before fetching any tuples */
 	Cost		total_cost;		/* total cost (assuming all tuples fetched) */
+	bool		parallel;		/* can run in parallel ? */
+	Cost		pstartup_cost;	/* startup cost for parallel execution */
+	Cost		ptotal_cost;	/* total cost for parallel execution */
 	/* Fields below here should be treated as private to costsize.c */
 	Cost		run_cost;		/* non-startup cost components */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 75e2afb..30b9858 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -91,9 +91,12 @@ extern void cost_sort(Path *path, PlannerInfo *root,
 		  List *pathkeys, Cost input_cost, double tuples, int width,
 		  Cost comparison_cost, int sort_mem,
 		  double limit_tuples);
+extern void cost_append(Path *path, List *subpaths);
 extern void cost_merge_append(Path *path, PlannerInfo *root,
 				  List *pathkeys, int n_streams,
 				  Cost input_startup_cost, Cost input_total_cost,
+				  bool parallel_estimate,
+				  Cost input_pstartup_cost, Cost input_ptotal_cost,
 				  double tuples);
 extern void cost_material(Path *path,
 			  Cost input_startup_cost, Cost input_total_cost,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index a0bcc82..b84d176 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -79,9 +79,8 @@ extern Path *create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
 						  Relids required_outer);
 extern ForeignPath *create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
 						double rows, Cost startup_cost, Cost total_cost,
-						List *pathkeys,
-						Relids required_outer,
-						List *fdw_private);
+						List *pathkeys,	Relids required_outer,
+						bool parallel,	List *fdw_private);
 extern Relids calc_nestloop_required_outer(Path *outer_path, Path *inner_path);
 extern Relids calc_non_nestloop_required_outer(Path *outer_path, Path *inner_path);
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 9b22fda..36879f1 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -20,8 +20,10 @@
  * allpaths.c
-extern bool enable_geqo;
-extern int	geqo_threshold;
+extern bool 	enable_geqo;
+extern int		geqo_threshold;
+extern double	parallel_cost_threshold;
+extern double	parallel_ratio_threshold;
 /* Hook for plugins to replace standard_join_search() */
 typedef RelOptInfo *(*join_search_hook_type) (PlannerInfo *root,
@@ -30,6 +32,7 @@ typedef RelOptInfo *(*join_search_hook_type) (PlannerInfo *root,
 extern PGDLLIMPORT join_search_hook_type join_search_hook;
+extern void choose_parallel_scans(RelOptInfo *rel);
 extern RelOptInfo *make_one_rel(PlannerInfo *root, List *joinlist);
 extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
 					 List *initial_rels);
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index 4504250..e72ad72 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -44,7 +44,7 @@ extern Plan *create_plan(PlannerInfo *root, Path *best_path);
 extern SubqueryScan *make_subqueryscan(List *qptlist, List *qpqual,
 				  Index scanrelid, Plan *subplan);
 extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual,
-				 Index scanrelid, List *fdw_exprs, List *fdw_private);
+			 Index scanrelid, bool parallel, List *fdw_exprs, List *fdw_private);
 extern Append *make_append(List *appendplans, List *tlist);
 extern RecursiveUnion *make_recursive_union(List *tlist,
 					 Plan *lefttree, Plan *righttree, int wtParam,
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..d252eb1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,11 +44,15 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PGconn	   *conn;			/* primary connection to foreign server */
+	List	   *extraconns;		/* Other connection list of AdditionalConns */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
+	char 	   *snapshot_id;	/* snapshot id for this server */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
 	bool		have_error;		/* have any subxacts aborted in this xact? */
+	int			reserved_aux_conns; /* Number of reserved rooms for parallel
+									 * connection */
 } ConnCacheEntry;
@@ -75,31 +79,16 @@ static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId parentSubid,
 					   void *arg);
- * Get a PGconn which can be used to execute queries on the remote PostgreSQL
- * server with the user's authorization.  A new connection is established
- * if we don't already have a suitable one, and a transaction is opened at
- * the right subtransaction nesting depth if we didn't do that already.
- *
- * will_prep_stmt must be true if caller intends to create any prepared
- * statements.  Since those don't go away automatically at transaction end
- * (not even on error), we need this flag to cue manual cleanup.
- *
- * XXX Note that caching connections theoretically requires a mechanism to
- * detect change of FDW objects to invalidate already established connections.
- * We could manage that by watching for invalidation events on the relevant
- * syscaches.  For the moment, though, it's not clear that this would really
- * be useful and not mere pedantry.  We could not flush any active connections
- * mid-transaction anyway.
+ * Search connection cache entry for specified server and user. Creates new
+ * one if not exists.
-PGconn *
-GetConnection(ForeignServer *server, UserMapping *user,
-			  bool will_prep_stmt)
+static ConnCacheEntry*
+search_cache_entry(ForeignServer *server, UserMapping *user)
-	bool		found;
-	ConnCacheEntry *entry;
 	ConnCacheKey key;
+	ConnCacheEntry *entry;
+	bool	found;
 	/* First time through, initialize connection cache hashtable */
 	if (ConnectionHash == NULL)
@@ -124,9 +113,6 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
-	/* Set flag that we did GetConnection during the current transaction */
-	xact_got_connection = true;
 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
 	key.serverid = server->serverid;
 	key.userid = user->userid;
@@ -139,11 +125,52 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		/* initialize new hashtable entry (key is already filled in) */
 		entry->conn = NULL;
+		entry->extraconns = NIL;
 		entry->xact_depth = 0;
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->reserved_aux_conns = 0;
+		entry->snapshot_id = NULL;
+	return entry;
+ * Get a PGconn which can be used to execute queries on the remote PostgreSQL
+ * server with the user's authorization.  A new connection is established
+ * if we don't already have a suitable one, and a transaction is opened at
+ * the right subtransaction nesting depth if we didn't do that already.
+ *
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements.  Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
+ * Create new auxiliary connection to be used for parallel query if parallel
+ * is true. The number of auxiliary connections assumed to be limited by the
+ * caller using ReserveParallelConnection().
+ *
+ * XXX Note that caching connections theoretically requires a mechanism to
+ * detect change of FDW objects to invalidate already established connections.
+ * We could manage that by watching for invalidation events on the relevant
+ * syscaches.  For the moment, though, it's not clear that this would really
+ * be useful and not mere pedantry.  We could not flush any active connections
+ * mid-transaction anyway.
+ */
+PGconn *
+GetConnection(ForeignServer *server, UserMapping *user,
+			  bool will_prep_stmt, bool parallel)
+	ConnCacheEntry *entry;
+	PGconn		*retconn;
+	Assert(!(will_prep_stmt && parallel));
+	entry = search_cache_entry(server, user);
+	/* Set flag that we did GetConnection during the current transaction */
+	xact_got_connection = true;
 	 * We don't check the health of cached connection here, because it would
 	 * require some overhead.  Broken connection will be detected when the
@@ -160,20 +187,102 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->xact_depth = 0;	/* just to be sure */
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->snapshot_id = NULL;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
 			 entry->conn, server->servername);
-	/*
-	 * Start a new transaction or subtransaction if needed.
-	 */
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
-	return entry->conn;
+	retconn = entry->conn;
+	if (parallel)
+	{
+		PGconn		*conn;
+		PGresult	*res;
+		char		 sql[64];
+		MemoryContext oldcontext;
+		/* Parallel query needs exported snapshot. */
+		if (entry->snapshot_id == NULL)
+			return NULL;
+		/* Make new connection and setup it. */
+		conn = connect_pg_server(server, user);
+		if (IsolationIsSerializable())
+		else
+		res = PQexec(conn, sql);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+		/*
+		 * Snapshot setup for parallel query. If the snapshot id could be get
+		 * from this server, this should succeed on the same server.
+		 */
+		snprintf(sql, 64, "SET TRANSACTION SNAPSHOT \'%s\'", entry->snapshot_id);
+		res = PQexec(conn, sql);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+		/* This list should be in the same context with connection cache */
+		oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+		entry->extraconns = lappend(entry->extraconns, conn);
+		MemoryContextSwitchTo(oldcontext);
+		retconn = conn;
+	}
+	elog(DEBUG3, "Get %s connection (parallels %d): %p",
+		 parallel ? "parallel" : "base", list_length(entry->extraconns),
+		 retconn);
+	return retconn;
+ * Reserve a room for parallel connection up to aux_conn_limit.
+ */
+ReserveParallelConnection(ForeignServer *server, UserMapping *user,
+						  int aux_conn_limit)
+	ConnCacheEntry *entry;
+	if (!user)
+		return false;
+	entry = search_cache_entry(server, user);
+	if (entry->reserved_aux_conns >= aux_conn_limit)
+		return false;
+	entry->reserved_aux_conns++;
+	return true;
+ * Reset reserved number of parallel connections
+ */
+ResetReservedParallelConnections(ForeignServer *server, UserMapping *user)
+	ConnCacheEntry *entry;
+	if (!user)
+		return false;
+	entry = search_cache_entry(server, user);
+	entry->reserved_aux_conns = 0;
+	return;
@@ -378,6 +487,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 	if (entry->xact_depth <= 0)
 		const char *sql;
+		PGresult *res;
 		elog(DEBUG3, "starting remote transaction on connection %p",
@@ -388,6 +498,30 @@ begin_remote_xact(ConnCacheEntry *entry)
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
+		/* 
+		 * To avoid error on the remote server, check if we can call
+		 * pg_export_snapshot() previously.
+		 */
+		sql = "SELECT count(*) FROM pg_proc p JOIN pg_namespace n ON "
+  			       "(n.oid = p.pronamespace AND n.nspname = 'pg_catalog')"
+			"WHERE p.proname = 'pg_export_snapshot' AND pronargs = 0";
+		res = PQexec(entry->conn, sql);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+		if (strcmp(PQgetvalue(res, 0, 0), "1") == 0)
+		{
+			/* Get transaction snapshot if we can */
+			PQclear(res);
+			sql = "SELECT pg_export_snapshot()";
+			res = PQexec(entry->conn, sql);
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+			entry->snapshot_id = strdup(PQgetvalue(res, 0, 0));
+			PQclear(res);
+		}
+		else
+			PQclear(res);
@@ -406,16 +540,92 @@ begin_remote_xact(ConnCacheEntry *entry)
- * Release connection reference count created by calling GetConnection.
+ * Close all auxiliary connections if any.
+ */
+static void
+ReleaseAllAuxConnections(ConnCacheEntry *entry)
+	ListCell *lc;
+	foreach(lc, entry->extraconns)
+	{
+		PGconn *c = (PGconn *) lfirst(lc);
+		PQfinish(c);
+	}
+	list_free(entry->extraconns);
+	entry->extraconns = NIL;
+ * Release connection if required. All connections but the base connection
+ * should be release immediately.
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(ForeignServer *server, UserMapping *user, PGconn *conn)
+	ConnCacheEntry *entry;
+	ConnCacheKey key;
+	/* Clean up current asynchronous query if any */
+	while (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+	{
+		PGresult *res = PQgetResult(conn);
+		if (res)
+			PQclear(res);
+	}		
+	/* Create hash key for the entry.  Assume no pad bytes in key struct */
+	key.serverid = server->serverid;
+	key.userid = user->userid;
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
+	 * Find cached entry for the connection.
+	entry = hash_search(ConnectionHash, &key, HASH_FIND, NULL);
+	if (!entry)
+		elog(WARNING, "Inconsistent connection release in postgres_fdw.");
+	else
+	{
+		if (entry->conn == conn) {
+			ReleaseAllAuxConnections(entry);
+			/*
+			 * Don't reset reserved number before auxiliary connections are
+			 * actually made
+			 */
+			if (entry->extraconns)
+				entry->reserved_aux_conns = 0;
+		}
+		else
+		{
+			ListCell *lc;
+			PGconn   *c = NULL;
+			/* Find  */
+			foreach(lc, entry->extraconns)
+			{
+				c = (PGconn *) lfirst(lc);
+				if (conn == c) break;
+			}
+			if (!lc)
+			{
+				/* XXXX: This is basically a bug, but simply ignore it... */
+				elog(WARNING, "Unknown connection is tried to release. Ignore it.");
+			}
+			else
+			{
+				/* Close the found connection and remove it from the list */
+				do_sql_command(c, "COMMIT TRANSACTION");
+				PQfinish(c);
+				elog(DEBUG3, "Parallel connection closed: %p", c);
+				entry->extraconns = list_delete_ptr(entry->extraconns, c);
+				if (entry->reserved_aux_conns > 0) 
+					entry->reserved_aux_conns--;
+			}
+		}
+	}
@@ -571,6 +781,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						res = PQexec(entry->conn, "DEALLOCATE ALL");
+					entry->snapshot_id = NULL;
 					entry->have_prep_stmt = false;
 					entry->have_error = false;
@@ -612,6 +823,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 							res = PQexec(entry->conn, "DEALLOCATE ALL");
+						entry->snapshot_id = NULL;
 						entry->have_prep_stmt = false;
 						entry->have_error = false;
@@ -619,6 +831,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
+		ReleaseAllAuxConnections(entry);
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 65e7b89..5323d10 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -150,6 +150,7 @@ InitPgFdwOptions(void)
 		/* cost factors */
 		{"fdw_startup_cost", ForeignServerRelationId, false},
 		{"fdw_tuple_cost", ForeignServerRelationId, false},
+		{"max_aux_connections", ForeignServerRelationId, false},
 		/* updatable is available on both server and table */
 		{"updatable", ForeignServerRelationId, false},
 		{"updatable", ForeignTableRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4c49776..3fd60e1 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -76,7 +76,8 @@ typedef struct PgFdwRelationInfo
 	/* Cached catalog information. */
 	ForeignTable *table;
 	ForeignServer *server;
-	UserMapping *user;			/* only set in use_remote_estimate mode */
+	UserMapping  *user;			/* only set in use_remote_estimate mode */
+	bool		  parallel;		/* true if this rel can be scanned in parallel */
 } PgFdwRelationInfo;
@@ -136,6 +137,10 @@ typedef struct PgFdwScanState
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
+	ForeignServer *server;		/* The foreign server this scan based on */
+	UserMapping	  *user;		/* The user this scan done by */
+	bool		parallel;		/* ture if this scan is allowed to run in
+								 * parallel */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -145,6 +150,7 @@ typedef struct PgFdwScanState
 	/* for storing result tuples */
 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
+	int			tupplane;		/* Which tuples currently read */
 	int			num_tuples;		/* # of tuples in array */
 	int			next_tuple;		/* index of next one to return */
@@ -168,6 +174,8 @@ typedef struct PgFdwModifyState
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
+	ForeignServer *server;		/* The foreign server this scan based on */
+	UserMapping	  *user;		/* The user this scan done by */
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
@@ -306,7 +314,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool async);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
@@ -328,6 +336,10 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
+/* PQtransactionStatus returns 2 state for IDLE condition */
+#define CONN_IS_IDLE(conn) \
+	(PQtransactionStatus(conn) == PQTRANS_IDLE || \
+	 PQtransactionStatus(conn) == PQTRANS_INTRANS)
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -384,6 +396,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
 	PgFdwRelationInfo *fpinfo;
 	ListCell   *lc;
+	unsigned long max_aux_connections = 0;
 	 * We use PgFdwRelationInfo to pass various information to subsequent
@@ -414,6 +427,8 @@ postgresGetForeignRelSize(PlannerInfo *root,
 			fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
 		else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
 			fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
+		else if (strcmp(def->defname, "max_aux_connections") == 0)
+			max_aux_connections = strtol(defGetString(def), NULL, 10);
 	foreach(lc, fpinfo->table->options)
@@ -442,6 +457,11 @@ postgresGetForeignRelSize(PlannerInfo *root,
 		fpinfo->user = NULL;
+	/* Check if this foregn relation has the auxiliary connection available.*/
+	fpinfo->parallel =
+		ReserveParallelConnection(fpinfo->server, fpinfo->user,
+								  max_aux_connections);
 	 * Identify which baserestrictinfo clauses can be sent to the remote
 	 * server and which can't.
@@ -558,6 +578,7 @@ postgresGetForeignPaths(PlannerInfo *root,
 								   NIL, /* no pathkeys */
 								   NULL,		/* no outer rel either */
+								   fpinfo->parallel,
 								   NIL);		/* no fdw_private list */
 	add_path(baserel, (Path *) path);
@@ -725,6 +746,7 @@ postgresGetForeignPaths(PlannerInfo *root,
 									   NIL,		/* no pathkeys */
+									   fpinfo->parallel,
 									   NIL);	/* no fdw_private list */
 		add_path(baserel, (Path *) path);
@@ -752,6 +774,9 @@ postgresGetForeignPlan(PlannerInfo *root,
 	StringInfoData sql;
 	ListCell   *lc;
+	/* reset reserved number of auxiliary connections */
+	ResetReservedParallelConnections(fpinfo->server, fpinfo->user);
 	 * Separate the scan_clauses into those that can be executed remotely and
 	 * those that can't.  baserestrictinfo clauses that were previously
@@ -866,6 +891,7 @@ postgresGetForeignPlan(PlannerInfo *root,
 	return make_foreignscan(tlist,
+							best_path->path.parallel,
@@ -888,6 +914,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	int			numParams;
 	int			i;
 	ListCell   *lc;
+	bool		parallel = fsplan->scan.parallel;
 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
@@ -918,7 +945,20 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
-	fsstate->conn = GetConnection(server, user, false);
+	fsstate->conn = GetConnection(server, user, false, parallel);
+	if (parallel && fsstate->conn == NULL)
+	{
+		/*
+		 * Somehow no more auxiliary connection available, so this relation is
+		 * scanned in the base connection sequentially.
+		 */
+		elog(WARNING, "Failed to get parallel connection for foreign relation %d. Fall back to base connection.",
+			 RelationGetRelid(fsstate->rel));
+		parallel = false;
+		fsstate->conn = GetConnection(server, user, false, false);		
+	}
+	fsstate->server = server;
+	fsstate->user = user;
 	/* Assign a unique ID for my cursor */
 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -981,6 +1021,17 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 		fsstate->param_values = NULL;
+	fsstate->parallel = parallel;
+	if (parallel)
+	{
+		/* 
+		 * This connection is allowed asynchronous query execution, so start
+		 * it just now. 
+		 */
+		create_cursor(node);
+		fetch_more_data(node, true);
+	}
@@ -1008,7 +1059,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(node, fsstate->parallel);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1099,7 +1150,7 @@ postgresEndForeignScan(ForeignScanState *node)
 		close_cursor(fsstate->conn, fsstate->cursor_number);
 	/* Release remote connection */
-	ReleaseConnection(fsstate->conn);
+	ReleaseConnection(fsstate->server, fsstate->user, fsstate->conn);
 	fsstate->conn = NULL;
 	/* MemoryContexts will be deleted automatically. */
@@ -1301,7 +1352,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	user = GetUserMapping(userid, server->serverid);
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(server, user, true);
+	fmstate->conn = GetConnection(server, user, true, false);
+	fmstate->server = server;
+	fmstate->user = user;
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 	/* Deconstruct fdw_private data. */
@@ -1599,7 +1652,7 @@ postgresEndForeignModify(EState *estate,
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
+	ReleaseConnection(fmstate->server, fmstate->user, fmstate->conn);
 	fmstate->conn = NULL;
@@ -1751,10 +1804,11 @@ estimate_path_cost_size(PlannerInfo *root,
 							  (fpinfo->remote_conds == NIL), NULL);
 		/* Get the remote estimate */
-		conn = GetConnection(fpinfo->server, fpinfo->user, false);
+		conn = GetConnection(fpinfo->server, fpinfo->user, false, false);
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
-		ReleaseConnection(conn);
+		ReleaseConnection(fpinfo->server, fpinfo->user, conn);
 		retrieved_rows = rows;
@@ -2001,10 +2055,12 @@ create_cursor(ForeignScanState *node)
- * Fetch some more rows from the node's cursor.
+ * Fetch some more rows from the node's cursor. async indicates that this
+ * query runs on the dedicated connection and requested asynchronous query
+ * execution.
 static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool async)
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
@@ -2026,6 +2082,7 @@ fetch_more_data(ForeignScanState *node)
 		int			fetch_size;
 		int			numrows;
 		int			i;
+		bool		skip_get_result = false;
 		/* The fetch size is arbitrary, but shouldn't be enormous. */
 		fetch_size = 100;
@@ -2033,36 +2090,81 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
-		res = PQexec(conn, sql);
-		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+		if (async && CONN_IS_IDLE(conn))
+		{
+			if (!PQsendQuery(conn, sql))
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+			/*
+			 * IDLE connection state on async query means that this is the
+			 * first call for this query, so return immediately. See
+			 * postgresBeginForeignScan()
+			 */
+			skip_get_result = true;
+		}
-		for (i = 0; i < numrows; i++)
+		if (!skip_get_result)
-			fsstate->tuples[i] =
-				make_tuple_from_result_row(res, i,
-										   fsstate->rel,
-										   fsstate->attinmeta,
-										   fsstate->retrieved_attrs,
-										   fsstate->temp_cxt);
-		}
+			if (async)
+			{
+				res = PQgetResult(conn);
+				if (PQntuples(res) == fetch_size)
+				{
+					/*
+					 * Connection state doesn't go to IDLE even if all data
+					 * has been sent to client for asynchronous query. One
+					 * more PQgetResult() is needed to reset the state to
+					 * IDLE.  See PQexecFinish() for details.
+					 */
+					if (PQgetResult(conn) != NULL)
+						elog(ERROR, "Connection status error.");
+				}
+			}
+			else
+				res = PQexec(conn, sql);
-		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
-			fsstate->fetch_ct_2++;
+			/* On error, report the original query, not the FETCH. */
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+			/* Convert the data into HeapTuples */
+			numrows = PQntuples(res);
+			fsstate->tuples =
+					(HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
-		PQclear(res);
-		res = NULL;
+			for (i = 0; i < numrows; i++)
+			{
+				fsstate->tuples[i] =
+					make_tuple_from_result_row(res, i,
+											   fsstate->rel,
+											   fsstate->attinmeta,
+											   fsstate->retrieved_attrs,
+											   fsstate->temp_cxt);
+			}
+			/* Update fetch_ct_2 */
+			if (fsstate->fetch_ct_2 < 2)
+				fsstate->fetch_ct_2++;
+			/* Must be EOF if we didn't get as many tuples as we asked for. */
+			fsstate->eof_reached = (numrows < fetch_size);
+			PQclear(res);
+			res = NULL;
+		}
+		if (async && !skip_get_result && !fsstate->eof_reached)
+		{
+			/*
+			 * We can immediately request the next bunch of tuples if we're on
+			 * asynchronous connection.
+			 */
+			if (!PQsendQuery(conn, sql))
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+		}
@@ -2315,8 +2417,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
-	conn = GetConnection(server, user, false);
+	conn = GetConnection(server, user, false, false);
 	 * Construct command to get page count for relation.
@@ -2345,7 +2446,7 @@ postgresAnalyzeForeignTable(Relation relation,
-	ReleaseConnection(conn);
+	ReleaseConnection(server, user, conn);
 	return true;
@@ -2407,7 +2508,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
-	conn = GetConnection(server, user, false);
+	conn = GetConnection(server, user, false, false);
 	 * Construct cursor that retrieves whole rows from remote.
@@ -2479,7 +2580,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
-	ReleaseConnection(conn);
+	ReleaseConnection(server, user, conn);
 	/* We assume that we have no dead tuple. */
 	*totaldeadrows = 0.0;
@@ -2609,7 +2710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	server = GetForeignServer(serverOid);
 	mapping = GetUserMapping(GetUserId(), server->serverid);
-	conn = GetConnection(server, mapping, false);
+	conn = GetConnection(server, mapping, false, false);
 	/* Don't attempt to import collation if remote server hasn't got it */
 	if (PQserverVersion(conn) < 90100)
@@ -2826,7 +2927,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
-	ReleaseConnection(conn);
+	ReleaseConnection(server, mapping, conn);
 	return commands;
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 94eadae..387b2ff 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -25,10 +25,16 @@ extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
 /* in connection.c */
+extern bool ReserveParallelConnection(ForeignServer *server, UserMapping *user,
+									  int aux_conn_limit);
+extern void ResetReservedParallelConnections(ForeignServer *server,
+											 UserMapping *user);
 extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
-			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
+							 bool will_prep_stmt, bool parallel);
+extern void ReleaseConnection(ForeignServer *server, UserMapping *user,
+							  PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
+extern int GetServerOrdinate(Oid serverid);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 5a4d5aa..7a12542 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -524,6 +524,7 @@ fileGetForeignPaths(PlannerInfo *root,
 									 NIL,		/* no pathkeys */
 									 NULL,		/* no outer rel either */
+									 false,
@@ -560,6 +561,7 @@ fileGetForeignPlan(PlannerInfo *root,
 	return make_foreignscan(tlist,
+							best_path->path.parallel,
 							NIL,	/* no expressions to evaluate */

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', 
dbname 'postgres', use_remote_estimate 'true', max_aux_connections '3');


CREATE TABLE t (a int, b int, c text);
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() 
* 1000)::int) FROM generate_series(0, 2999));

CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t 
a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;

CREATE FOREIGN TABLE fts1 (a int, b int, c text) SERVER pgs1 OPTIONS 
(table_name 't');
CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) 
SERVER pgs1 OPTIONS (table_name 'v');

SET parallel_ratio_threshold to 1.0;

EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1 b on (a.a = b.a);

SET parallel_ratio_threshold to 0.0;

EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1 b on (a.a = b.a);
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to