From 40df802e38e18c96a4672062f1305405c1641a4a Mon Sep 17 00:00:00 2001
From: Juan Jose Santamaria Flecha <juanjo.santamaria@gmail.com>
Date: Mon, 14 Aug 2023 11:10:06 -0400
Subject: [PATCH] =?UTF-8?q?Allow=20parallel=20plan=20for=20referential=20i?=
 =?UTF-8?q?ntegrity=20checks=20Based=20on=20previous=20work=20from=20Fr?=
 =?UTF-8?q?=C3=A9d=C3=A9ric=20Yhuel?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 src/backend/utils/adt/ri_triggers.c       | 19 ++++++++++++++++---
 src/test/regress/expected/alter_table.out | 10 ++++++++++
 src/test/regress/sql/alter_table.sql      | 11 +++++++++++
 3 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 6945d99..b7f92ed 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -1383,8 +1383,10 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	const char *pk_only;
 	int			save_nestlevel;
 	char		workmembuf[32];
+	char		maxmntworkers[4];
 	int			spi_result;
 	SPIPlanPtr	qplan;
+	SPIPrepareOptions options;
 
 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
 
@@ -1531,6 +1533,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 		}
 	}
 	appendStringInfoChar(&querybuf, ')');
+	elog(DEBUG2, "The RI_Initial_Check() query string built is \"%s\"",
+		 querybuf.data);
 
 	/*
 	 * Temporarily increase work_mem so that the check query can be executed
@@ -1540,7 +1544,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	 * this seems to meet the criteria for being considered a "maintenance"
 	 * operation, and accordingly we use maintenance_work_mem.  However, we
 	 * must also set hash_mem_multiplier to 1, since it is surely not okay to
-	 * let that get applied to the maintenance_work_mem value.
+	 * let that get applied to the maintenance_work_mem value. In the same
+	 * fashion, cap parallel processes by max_parallel_maintenance_workers.
 	 *
 	 * We use the equivalent of a function SET option to allow the setting to
 	 * persist for exactly the duration of the check query.  guc.c also takes
@@ -1549,12 +1554,18 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	save_nestlevel = NewGUCNestLevel();
 
 	snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
+	/* max_parallel_maintenance_workers <= 1024, so maxmntworkers is char[4] */
+	snprintf(maxmntworkers, sizeof(maxmntworkers), "%d",
+			 max_parallel_maintenance_workers);
 	(void) set_config_option("work_mem", workmembuf,
 							 PGC_USERSET, PGC_S_SESSION,
 							 GUC_ACTION_SAVE, true, 0, false);
 	(void) set_config_option("hash_mem_multiplier", "1",
 							 PGC_USERSET, PGC_S_SESSION,
 							 GUC_ACTION_SAVE, true, 0, false);
+	(void) set_config_option("max_parallel_workers_per_gather", maxmntworkers,
+							 PGC_USERSET, PGC_S_SESSION,
+							 GUC_ACTION_SAVE, true, 0, false);
 
 	if (SPI_connect() != SPI_OK_CONNECT)
 		elog(ERROR, "SPI_connect failed");
@@ -1563,10 +1574,12 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	 * Generate the plan.  We don't need to cache it, and there are no
 	 * arguments to the plan.
 	 */
-	qplan = SPI_prepare(querybuf.data, 0, NULL);
+	memset(&options, 0, sizeof(options));
+	options.cursorOptions |= CURSOR_OPT_PARALLEL_OK;
+	qplan = SPI_prepare_extended(querybuf.data, &options);
 
 	if (qplan == NULL)
-		elog(ERROR, "SPI_prepare returned %s for %s",
+		elog(ERROR, "SPI_prepare_extended returned %s for %s",
 			 SPI_result_code_string(SPI_result), querybuf.data);
 
 	/*
diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out
index cd814ff..0d2a634 100644
--- a/src/test/regress/expected/alter_table.out
+++ b/src/test/regress/expected/alter_table.out
@@ -4658,3 +4658,13 @@ drop publication pub1;
 drop schema alter1 cascade;
 drop schema alter2 cascade;
 NOTICE:  drop cascades to table alter2.t1
+-- ALTER TABLE operations that can be parallel
+CREATE TABLE parallel_pk_table (a int) WITH (autovacuum_enabled = off);
+CREATE TABLE parallel_fk_table (a int) WITH (autovacuum_enabled = off);
+SET max_parallel_maintenance_workers TO 4;
+SET parallel_setup_cost TO 0;
+SET parallel_tuple_cost TO 0;
+SET parallel_leader_participation TO 0;
+SET min_parallel_table_scan_size TO 0;
+ALTER TABLE parallel_pk_table ADD PRIMARY KEY (a);
+ALTER TABLE parallel_fk_table ADD CONSTRAINT parallel_fk FOREIGN KEY (a) REFERENCES parallel_pk_table (a);
diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql
index ff8c498..6256bdb 100644
--- a/src/test/regress/sql/alter_table.sql
+++ b/src/test/regress/sql/alter_table.sql
@@ -3066,3 +3066,14 @@ alter table alter1.t1 set schema alter2;
 drop publication pub1;
 drop schema alter1 cascade;
 drop schema alter2 cascade;
+
+-- ALTER TABLE operations that can be parallel
+CREATE TABLE parallel_pk_table (a int) WITH (autovacuum_enabled = off);
+CREATE TABLE parallel_fk_table (a int) WITH (autovacuum_enabled = off);
+SET max_parallel_maintenance_workers TO 4;
+SET parallel_setup_cost TO 0;
+SET parallel_tuple_cost TO 0;
+SET parallel_leader_participation TO 0;
+SET min_parallel_table_scan_size TO 0;
+ALTER TABLE parallel_pk_table ADD PRIMARY KEY (a);
+ALTER TABLE parallel_fk_table ADD CONSTRAINT parallel_fk FOREIGN KEY (a) REFERENCES parallel_pk_table (a);
-- 
2.11.0

