From 0ae7b130d7ded5765b30cc6c6a37d8b882809a8c Mon Sep 17 00:00:00 2001
From: Hari Babu Kommi <kommih@localhost.localdomain>
Date: Tue, 29 Aug 2017 10:34:34 +1000
Subject: [PATCH] Make parallel eligible for utility commands underneath
 queries

This functionality adds the parallel query support to the queries
that run as part of some of the utility commands such as
CREATE MATERIALIZED VIEW, CREATE TABLE AS and etc.
---
 doc/src/sgml/parallel.sgml                   | 32 +++++++----
 src/backend/access/heap/heapam.c             | 10 ++--
 src/backend/commands/createas.c              |  4 +-
 src/backend/commands/explain.c               |  4 +-
 src/backend/commands/matview.c               |  2 +-
 src/backend/executor/execMain.c              |  6 +--
 src/test/regress/expected/write_parallel.out | 80 ++++++++++++++++++++++++++++
 src/test/regress/parallel_schedule           |  1 +
 src/test/regress/serial_schedule             |  1 +
 src/test/regress/sql/write_parallel.sql      | 43 +++++++++++++++
 10 files changed, 158 insertions(+), 25 deletions(-)
 create mode 100644 src/test/regress/expected/write_parallel.out
 create mode 100644 src/test/regress/sql/write_parallel.sql

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 2a25f21..85f1d38 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -243,15 +243,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
 
     <listitem>
       <para>
-        A prepared statement is executed using a <literal>CREATE TABLE .. AS
-        EXECUTE ..</literal> statement.  This construct converts what otherwise
-        would have been a read-only operation into a read-write operation,
-        making it ineligible for parallel query.
-      </para>
-    </listitem>
-
-    <listitem>
-      <para>
         The transaction isolation level is serializable.  This situation
         does not normally arise, because parallel query plans are not
         generated when the transaction isolation level is serializable.
@@ -391,7 +382,30 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
   </para>
 
  </sect2>
+ 
+ <sect2 id="writes-using-parallel-plans">
+  <title>Write Operations Using Parallel Plan</title>
 
+   <para>
+    Currently there are limited number of write operations
+    that uses benefits of parallelism. Those are,
+   </para> 
+   
+  <itemizedlist>
+   <listitem>
+    <para>
+     An utility statement that is used to create table or materialized view
+     such as <literal>CREATE TABLE .. AS</literal> or <literal>CREATE
+     MATERIALIZED VIEW .. AS</literal> and etc statements are supported.
+     These write operations not perfomed concurrently by the parallel workers,
+     but the underlying query that is used by these operations are eligible
+     for parallel plans.
+    </para>
+   </listitem>
+  </itemizedlist>
+  
+ </sect2>
+ 
  <sect2 id="parallel-plan-tips">
   <title>Parallel Plan Tips</title>
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e29c5ad..47de10e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2595,15 +2595,13 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 					CommandId cid, int options)
 {
 	/*
-	 * For now, parallel operations are required to be strictly read-only.
-	 * Unlike heap_update() and heap_delete(), an insert should never create a
-	 * combo CID, so it might be possible to relax this restriction, but not
-	 * without more thought and testing.
+	 * For now, parallel operations are required to be strictly read-only in
+	 * parallel worker.
 	 */
-	if (IsInParallelMode())
+	if (IsParallelWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples during a parallel operation")));
+				 errmsg("cannot insert tuples in a parallel worker")));
 
 	if (relation->rd_rel->relhasoids)
 	{
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index e60210c..4d77411 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -326,8 +326,8 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		query = linitial_node(Query, rewritten);
 		Assert(query->commandType == CMD_SELECT);
 
-		/* plan the query --- note we disallow parallelism */
-		plan = pg_plan_query(query, 0, params);
+		/* plan the query */
+		plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, params);
 
 		/*
 		 * Use a snapshot with an updated command ID to ensure this query sees
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 4cee357..c13efc9 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -400,8 +400,6 @@ ExplainOneUtility(Node *utilityStmt, IntoClause *into, ExplainState *es,
 		 * We have to rewrite the contained SELECT and then pass it back to
 		 * ExplainOneQuery.  It's probably not really necessary to copy the
 		 * contained parsetree another time, but let's be safe.
-		 *
-		 * Like ExecCreateTableAs, disallow parallelism in the plan.
 		 */
 		CreateTableAsStmt *ctas = (CreateTableAsStmt *) utilityStmt;
 		List	   *rewritten;
@@ -409,7 +407,7 @@ ExplainOneUtility(Node *utilityStmt, IntoClause *into, ExplainState *es,
 		rewritten = QueryRewrite(castNode(Query, copyObject(ctas->query)));
 		Assert(list_length(rewritten) == 1);
 		ExplainOneQuery(linitial_node(Query, rewritten),
-						0, ctas->into, es,
+						CURSOR_OPT_PARALLEL_OK, ctas->into, es,
 						queryString, params, queryEnv);
 	}
 	else if (IsA(utilityStmt, DeclareCursorStmt))
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index d2e0376..b6fa062 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -404,7 +404,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	CHECK_FOR_INTERRUPTS();
 
 	/* Plan the query which will generate data for the refresh. */
-	plan = pg_plan_query(query, 0, NULL);
+	plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
 
 	/*
 	 * Use a snapshot with an updated command ID to ensure this query sees
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 2946a0e..6458f0b 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1689,11 +1689,9 @@ ExecutePlan(EState *estate,
 
 	/*
 	 * If the plan might potentially be executed multiple times, we must force
-	 * it to run without parallelism, because we might exit early.  Also
-	 * disable parallelism when writing into a relation, because no database
-	 * changes are allowed in parallel mode.
+	 * it to run without parallelism, because we might exit early.
 	 */
-	if (!execute_once || dest->mydest == DestIntoRel)
+	if (!execute_once)
 		use_parallel_mode = false;
 
 	if (use_parallel_mode)
diff --git a/src/test/regress/expected/write_parallel.out b/src/test/regress/expected/write_parallel.out
new file mode 100644
index 0000000..e549cc2
--- /dev/null
+++ b/src/test/regress/expected/write_parallel.out
@@ -0,0 +1,80 @@
+--
+-- PARALLEL
+--
+-- Serializable isolation would disable parallel query, so explicitly use an
+-- arbitrary other level.
+begin isolation level repeatable read;
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+--
+-- Test write operations that has an underlying query that is eligble
+-- for parallel plans
+--
+explain (costs off) create table parallel_write as
+    select length(stringu1) from tenk1 group by length(stringu1);
+                    QUERY PLAN                     
+---------------------------------------------------
+ Finalize HashAggregate
+   Group Key: (length((stringu1)::text))
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial HashAggregate
+               Group Key: length((stringu1)::text)
+               ->  Parallel Seq Scan on tenk1
+(7 rows)
+
+create table parallel_write as
+    select length(stringu1) from tenk1 group by length(stringu1);
+drop table parallel_write;
+explain (costs off) select length(stringu1) into parallel_write
+    from tenk1 group by length(stringu1);
+                    QUERY PLAN                     
+---------------------------------------------------
+ Finalize HashAggregate
+   Group Key: (length((stringu1)::text))
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial HashAggregate
+               Group Key: length((stringu1)::text)
+               ->  Parallel Seq Scan on tenk1
+(7 rows)
+
+select length(stringu1) into parallel_write
+    from tenk1 group by length(stringu1);
+drop table parallel_write;
+explain (costs off) create materialized view parallel_mat_view as
+    select length(stringu1) from tenk1 group by length(stringu1);
+                    QUERY PLAN                     
+---------------------------------------------------
+ Finalize HashAggregate
+   Group Key: (length((stringu1)::text))
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial HashAggregate
+               Group Key: length((stringu1)::text)
+               ->  Parallel Seq Scan on tenk1
+(7 rows)
+
+create materialized view parallel_mat_view as
+    select length(stringu1) from tenk1 group by length(stringu1);
+Refresh materialized view parallel_mat_view;
+drop materialized view parallel_mat_view;
+prepare prep_stmt as select length(stringu1) from tenk1 group by length(stringu1);
+explain (costs off) create table parallel_write as execute prep_stmt;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Finalize HashAggregate
+   Group Key: (length((stringu1)::text))
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial HashAggregate
+               Group Key: length((stringu1)::text)
+               ->  Parallel Seq Scan on tenk1
+(7 rows)
+
+create table parallel_write as execute prep_stmt;
+drop table parallel_write;
+rollback;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 2fd3f2b..860e8ab 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -96,6 +96,7 @@ test: rules psql_crosstab amutils
 
 # run by itself so it can run parallel workers
 test: select_parallel
+test: write_parallel
 
 # no relation related tests can be put in this group
 test: publication subscription
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 76b0de3..ef275d0 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -134,6 +134,7 @@ test: stats_ext
 test: rules
 test: psql_crosstab
 test: select_parallel
+test: write_parallel
 test: publication
 test: subscription
 test: amutils
diff --git a/src/test/regress/sql/write_parallel.sql b/src/test/regress/sql/write_parallel.sql
new file mode 100644
index 0000000..00f9156
--- /dev/null
+++ b/src/test/regress/sql/write_parallel.sql
@@ -0,0 +1,43 @@
+--
+-- PARALLEL
+--
+
+-- Serializable isolation would disable parallel query, so explicitly use an
+-- arbitrary other level.
+begin isolation level repeatable read;
+
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+
+--
+-- Test write operations that has an underlying query that is eligble
+-- for parallel plans
+--
+explain (costs off) create table parallel_write as
+    select length(stringu1) from tenk1 group by length(stringu1);
+create table parallel_write as
+    select length(stringu1) from tenk1 group by length(stringu1);
+drop table parallel_write;
+
+explain (costs off) select length(stringu1) into parallel_write
+    from tenk1 group by length(stringu1);
+select length(stringu1) into parallel_write
+    from tenk1 group by length(stringu1);
+drop table parallel_write;
+
+explain (costs off) create materialized view parallel_mat_view as
+    select length(stringu1) from tenk1 group by length(stringu1);
+create materialized view parallel_mat_view as
+    select length(stringu1) from tenk1 group by length(stringu1);
+Refresh materialized view parallel_mat_view;
+drop materialized view parallel_mat_view;
+
+prepare prep_stmt as select length(stringu1) from tenk1 group by length(stringu1);
+explain (costs off) create table parallel_write as execute prep_stmt;
+create table parallel_write as execute prep_stmt;
+drop table parallel_write;
+
+rollback;
-- 
1.8.3.1

