On Fri, Oct 7, 2022 at 7:17 PM Amit Langote <amitlangot...@gmail.com> wrote:
> On Fri, Oct 7, 2022 at 19:15 Alvaro Herrera <alvhe...@alvh.no-ip.org> wrote:
>> On 2022-Oct-07, Amit Langote wrote:
>> > > Thanks for the heads up.  Hmm, this I am not sure how to reproduce on
>> > > my own, so I am currently left with second-guessing what may be going
>> > > wrong on 32 bit machines with whichever of the 4 patches.
>> > >
>> > > For now, I'll just post 0001, which I am claiming has no semantic
>> > > changes (proof pending), to rule out that that one's responsible.
>> >
>> > Nope, not 0001.  Here's 0001+0002.

I had forgotten to actually attach anything with that email.

>> Please note that you can set up a github repository so that cirrus-ci
>> tests whatever patches you like, without having to post them to
>> pg-hackers.  See src/tools/ci/README, it takes three minutes if you
>> already have the account and repository.
>
> Ah, that’s right.  Will do so, thanks for the suggestion.

I'm waiting to hear from GitHub Support to resolve an error I'm facing
trying to add Cirrus CI to my account.


--
Thanks, Amit Langote
EDB: http://www.enterprisedb.com
From 363e7539afea9b5ef287865b5176395818e880df Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Tue, 28 Jun 2022 17:15:51 +0900
Subject: [PATCH v6 1/4] Avoid using SPI in RI trigger functions

Currently, ri_PlanCheck() uses SPI_prepare() to get an "SPI plan"
containing a CachedPlanSource for the SQL query that a given RI
trigger function uses to implement an RI check.  Furthermore,
ri_PerformCheck() calls SPI_execute_snapshot() on the "SPI plan"
to execute the query for a given snapshot.

This commit invents ri_PlanCreate() and ri_PlanExecute() to take
the place of SPI_prepare() and SPI_execute_snapshot(), respectively.

ri_PlanCreate() will create an "RI plan" for a given query, using a
caller-specified (caller of ri_PlanCheck() that is) callback
function.  For example, the callback ri_SqlStringPlanCreate() will
produce a CachedPlanSource for the input SQL string, just as
SPI_prepare() would.

ri_PlanExecute() will execute the "RI plan" by calling a
caller-specific callback function whose pointer is saved within the
"RI Plan" data structure (struct RIPlan).  For example, the callback
ri_SqlStringPlanExecute() will fetch a CachedPlan for given
CachedPlanSource found in the "RI plan" and execute its PlannedStmt
by invoking the executor, just as SPI_execute_snapshot() would.
Details such as which snapshot to use are now fully controlled by
ri_PerformCheck(), whereas the previous arrangement relied on the
SPI logic for snapshot management.

ri_PlanCreate(), ri_PlanExecute(), and the "RI plan" data structure
they manipulate are pluggable such that it will be possible for the
future commits to replace the current SQL string based implementation
of some RI checks with something as simple as a C function to directly
scan the underlying table/index of the referencing or the referenced
table.

NB: RI_Initial_Check() and RI_PartitionRemove_Check() still use the
the SPI_prepare()/SPI_execute_snapshot() combination, because I
haven't yet added a proper DestReceiver in ri_SqlStringPlanExecute()
to receive and process the tuples that the execution would produce,
which those RI_* functions will need.
---
 src/backend/executor/spi.c          |   2 +-
 src/backend/utils/adt/ri_triggers.c | 600 +++++++++++++++++++++++-----
 2 files changed, 490 insertions(+), 112 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index fd5796f1b9..a30553ea67 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -762,7 +762,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
  * end of the command.
  *
  * This is currently not documented in spi.sgml because it is only intended
- * for use by RI triggers.
+ * for use by some functions in ri_triggers.c.
  *
  * Passing snapshot == InvalidSnapshot will select the normal behavior of
  * fetching a new snapshot for each query.
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 1d503e7e01..cfebd9c4f2 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -9,7 +9,7 @@
  *	across query and transaction boundaries, in fact they live as long as
  *	the backend does.  This works because the hashtable structures
  *	themselves are allocated by dynahash.c in its permanent DynaHashCxt,
- *	and the SPI plans they point to are saved using SPI_keepplan().
+ *	and the CachedPlanSources they point to are saved in CachedMemoryContext.
  *	There is not currently any provision for throwing away a no-longer-needed
  *	plan --- consider improving this someday.
  *
@@ -40,6 +40,8 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
 #include "storage/bufmgr.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -127,10 +129,55 @@ typedef struct RI_ConstraintInfo
 	dlist_node	valid_link;		/* Link in list of valid entries */
 } RI_ConstraintInfo;
 
+/* RI plan callback functions */
+struct RI_Plan;
+typedef void (*RI_PlanCreateFunc_type) (struct RI_Plan *plan, const char *querystr, int nargs, Oid *paramtypes);
+typedef int (*RI_PlanExecFunc_type) (struct RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+									 Datum *param_vals, char *params_isnulls,
+									 Snapshot test_snapshot, Snapshot crosscheck_snapshot,
+									 int limit, CmdType *last_stmt_cmdtype);
+typedef bool (*RI_PlanIsValidFunc_type) (struct RI_Plan *plan);
+typedef void (*RI_PlanFreeFunc_type) (struct RI_Plan *plan);
+
+/*
+ * RI_Plan
+ *
+ * Information related to the implementation of a plan for a given RI query.
+ * ri_PlanCheck() makes and stores these in ri_query_cache.  The callers of
+ * ri_PlanCheck() specify a RI_PlanCreateFunc_type function to fill in the
+ * caller-specific implementation details such as the callback functions
+ * to create, validate, free a plan, and also the arguments necessary for
+ * the execution of the plan.
+ */
+typedef struct RI_Plan
+{
+	/*
+	 * Context under which this struct and its subsidiary data gets allocated.
+	 * It is made a child of CacheMemoryContext.
+	 */
+	MemoryContext	plancxt;
+
+	/* Query parameter types. */
+	int				nargs;
+	Oid			   *paramtypes;
+
+	/*
+	 * Set of functions specified by a RI trigger function to implement
+	 * the plan for the trigger's RI query.
+	 */
+	RI_PlanExecFunc_type plan_exec_func;	/* execute the plan */
+	void		   *plan_exec_arg;			/* execution argument, such as
+											 * a List of CachedPlanSource */
+	RI_PlanIsValidFunc_type plan_is_valid_func; /* check if the plan still
+												 * valid for ri_query_cache
+												 * to continue caching it */
+	RI_PlanFreeFunc_type plan_free_func;	/* release plan resources */
+} RI_Plan;
+
 /*
  * RI_QueryKey
  *
- * The key identifying a prepared SPI plan in our query hashtable
+ * The key identifying a plan in our query hashtable
  */
 typedef struct RI_QueryKey
 {
@@ -144,7 +191,7 @@ typedef struct RI_QueryKey
 typedef struct RI_QueryHashEntry
 {
 	RI_QueryKey key;
-	SPIPlanPtr	plan;
+	RI_Plan	   *plan;
 } RI_QueryHashEntry;
 
 /*
@@ -208,8 +255,8 @@ static bool ri_AttributesEqual(Oid eq_opr, Oid typeid,
 
 static void ri_InitHashTables(void);
 static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue);
-static SPIPlanPtr ri_FetchPreparedPlan(RI_QueryKey *key);
-static void ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan);
+static RI_Plan *ri_FetchPreparedPlan(RI_QueryKey *key);
+static void ri_HashPreparedPlan(RI_QueryKey *key, RI_Plan *plan);
 static RI_CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid);
 
 static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname,
@@ -218,13 +265,14 @@ static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
 													   Relation trig_rel, bool rel_is_pk);
 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
 static Oid	get_ri_constraint_root(Oid constrOid);
-static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
-							   RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
+static RI_Plan *ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+							 const char *querystr, int nargs, Oid *argtypes,
+							 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
-							RI_QueryKey *qkey, SPIPlanPtr qplan,
+							RI_QueryKey *qkey, RI_Plan *qplan,
 							Relation fk_rel, Relation pk_rel,
 							TupleTableSlot *oldslot, TupleTableSlot *newslot,
-							bool detectNewRows, int expect_OK);
+							bool detectNewRows, int expected_cmdtype);
 static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 							 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
 							 Datum *vals, char *nulls);
@@ -232,6 +280,15 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 							   Relation pk_rel, Relation fk_rel,
 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
 							   int queryno, bool partgone) pg_attribute_noreturn();
+static void ri_SqlStringPlanCreate(RI_Plan *plan,
+					   const char *querystr, int nargs, Oid *paramtypes);
+static bool ri_SqlStringPlanIsValid(RI_Plan *plan);
+static int ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+						Datum *vals, char *nulls,
+						Snapshot test_snapshot,
+						Snapshot crosscheck_snapshot,
+						int limit, CmdType *last_stmt_cmdtype);
+static void ri_SqlStringPlanFree(RI_Plan *plan);
 
 
 /*
@@ -247,7 +304,7 @@ RI_FKey_check(TriggerData *trigdata)
 	Relation	pk_rel;
 	TupleTableSlot *newslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, false);
@@ -344,9 +401,6 @@ RI_FKey_check(TriggerData *trigdata)
 			break;
 	}
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
 	/* Fetch or prepare a saved plan for the real check */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
 
@@ -392,8 +446,9 @@ RI_FKey_check(TriggerData *trigdata)
 		}
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -408,10 +463,7 @@ RI_FKey_check(TriggerData *trigdata)
 					fk_rel, pk_rel,
 					NULL, newslot,
 					pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
-					SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_SELECT);
 
 	table_close(pk_rel, RowShareLock);
 
@@ -466,16 +518,13 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 				  TupleTableSlot *oldslot,
 				  const RI_ConstraintInfo *riinfo)
 {
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 	RI_QueryKey qkey;
 	bool		result;
 
 	/* Only called for non-null rows */
 	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
 	/*
 	 * Fetch or prepare a saved plan for checking PK table with values coming
 	 * from a PK row
@@ -523,8 +572,9 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 		}
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -535,10 +585,7 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 							 fk_rel, pk_rel,
 							 oldslot, NULL,
 							 true,	/* treat like update */
-							 SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+							 CMD_SELECT);
 
 	return result;
 }
@@ -632,7 +679,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 	Relation	pk_rel;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, true);
@@ -660,9 +707,6 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		return PointerGetDatum(NULL);
 	}
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
 	/*
 	 * Fetch or prepare a saved plan for the restrict lookup (it's the same
 	 * query for delete and update cases)
@@ -715,8 +759,9 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		}
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -727,10 +772,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 					fk_rel, pk_rel,
 					oldslot, NULL,
 					true,		/* must detect new rows */
-					SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_SELECT);
 
 	table_close(fk_rel, RowShareLock);
 
@@ -752,7 +794,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	Relation	pk_rel;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	/* Check that this is a valid trigger call on the right time and event. */
 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE);
@@ -770,9 +812,6 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 	pk_rel = trigdata->tg_relation;
 	oldslot = trigdata->tg_trigslot;
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
 	/* Fetch or prepare a saved plan for the cascaded delete */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_ONDELETE);
 
@@ -820,8 +859,9 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 			queryoids[i] = pk_type;
 		}
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -833,10 +873,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 					fk_rel, pk_rel,
 					oldslot, NULL,
 					true,		/* must detect new rows */
-					SPI_OK_DELETE);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_DELETE);
 
 	table_close(fk_rel, RowExclusiveLock);
 
@@ -859,7 +896,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	TupleTableSlot *newslot;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 
 	/* Check that this is a valid trigger call on the right time and event. */
 	ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE);
@@ -879,9 +916,6 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 	newslot = trigdata->tg_newslot;
 	oldslot = trigdata->tg_trigslot;
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
 	/* Fetch or prepare a saved plan for the cascaded update */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_ONUPDATE);
 
@@ -942,8 +976,9 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 		}
 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys * 2, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -954,10 +989,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 					fk_rel, pk_rel,
 					oldslot, newslot,
 					true,		/* must detect new rows */
-					SPI_OK_UPDATE);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_UPDATE);
 
 	table_close(fk_rel, RowExclusiveLock);
 
@@ -1039,7 +1071,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 	Relation	pk_rel;
 	TupleTableSlot *oldslot;
 	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 	int32		queryno;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
@@ -1055,9 +1087,6 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 	pk_rel = trigdata->tg_relation;
 	oldslot = trigdata->tg_trigslot;
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
 	/*
 	 * Fetch or prepare a saved plan for the trigger.
 	 */
@@ -1174,8 +1203,9 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 			queryoids[i] = pk_type;
 		}
 
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
+		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -1186,10 +1216,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 					fk_rel, pk_rel,
 					oldslot, NULL,
 					true,		/* must detect new rows */
-					SPI_OK_UPDATE);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+					CMD_UPDATE);
 
 	table_close(fk_rel, RowExclusiveLock);
 
@@ -1382,7 +1409,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	int			save_nestlevel;
 	char		workmembuf[32];
 	int			spi_result;
-	SPIPlanPtr	qplan;
+	SPIPlanPtr  qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
 
@@ -1963,7 +1990,7 @@ ri_GenerateQualCollation(StringInfo buf, Oid collation)
 /* ----------
  * ri_BuildQueryKey -
  *
- *	Construct a hashtable key for a prepared SPI plan of an FK constraint.
+ *	Construct a hashtable key for a plan of an FK constraint.
  *
  *		key: output argument, *key is filled in based on the other arguments
  *		riinfo: info derived from pg_constraint entry
@@ -1982,9 +2009,9 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * the FK constraint (i.e., not the table on which the trigger has been
 	 * fired), and so it will be the same for all members of the inheritance
 	 * tree.  So we may use the root constraint's OID in the hash key, rather
-	 * than the constraint's own OID.  This avoids creating duplicate SPI
-	 * plans, saving lots of work and memory when there are many partitions
-	 * with similar FK constraints.
+	 * than the constraint's own OID.  This avoids creating duplicate plans,
+	 * saving lots of work and memory when there are many partitions with
+	 * similar FK constraints.
 	 *
 	 * (Note that we must still have a separate RI_ConstraintInfo for each
 	 * constraint, because partitions can have different column orders,
@@ -2258,15 +2285,368 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
 	}
 }
 
+/* Query string or an equivalent name to show in the error CONTEXT. */
+typedef struct RIErrorCallbackArg
+{
+	const char *query;
+} RIErrorCallbackArg;
+
+/*
+ * _RI_error_callback
+ *
+ * Add context information when a query being processed with ri_CreatePlan()
+ * or ri_PlanExecute() fails.
+ */
+static void
+_RI_error_callback(void *arg)
+{
+	RIErrorCallbackArg *carg = (RIErrorCallbackArg *) arg;
+	const char *query = carg->query;
+	int			syntaxerrposition;
+
+	Assert(query != NULL);
+
+	/*
+	 * If there is a syntax error position, convert to internal syntax error;
+	 * otherwise treat the query as an item of context stack
+	 */
+	syntaxerrposition = geterrposition();
+	if (syntaxerrposition > 0)
+	{
+		errposition(0);
+		internalerrposition(syntaxerrposition);
+		internalerrquery(query);
+	}
+	else
+		errcontext("SQL statement \"%s\"", query);
+}
+
+/*
+ * This creates a plan for a query written in SQL.
+ *
+ * The main product is a list of CachedPlanSource for each of the queries
+ * resulting from the provided query's rewrite that is saved to
+ * plan->plan_exec_arg.
+ */
+static void
+ri_SqlStringPlanCreate(RI_Plan *plan,
+					   const char *querystr, int nargs, Oid *paramtypes)
+{
+	List	   *raw_parsetree_list;
+	List	   *plancache_list = NIL;
+	ListCell   *list_item;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	Assert(querystr != NULL);
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = querystr;
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/*
+	 * Parse the request string into a list of raw parse trees.
+	 */
+	raw_parsetree_list = raw_parser(querystr, RAW_PARSE_DEFAULT);
+
+	/*
+	 * Do parse analysis and rule rewrite for each raw parsetree, storing the
+	 * results into unsaved plancache entries.
+	 */
+	plancache_list = NIL;
+
+	foreach(list_item, raw_parsetree_list)
+	{
+		RawStmt    *parsetree = lfirst_node(RawStmt, list_item);
+		List	   *stmt_list;
+		CachedPlanSource *plansource;
+
+		/*
+		 * Create the CachedPlanSource before we do parse analysis, since it
+		 * needs to see the unmodified raw parse tree.
+		 */
+		plansource = CreateCachedPlan(parsetree, querystr,
+									  CreateCommandTag(parsetree->stmt));
+
+		stmt_list = pg_analyze_and_rewrite_fixedparams(parsetree, querystr,
+													   paramtypes, nargs,
+													   NULL);
+
+		/* Finish filling in the CachedPlanSource */
+		CompleteCachedPlan(plansource,
+						   stmt_list,
+						   NULL,
+						   paramtypes, nargs,
+						   NULL, NULL, 0,
+						   false);	/* not fixed result */
+
+		SaveCachedPlan(plansource);
+		plancache_list = lappend(plancache_list, plansource);
+	}
+
+	plan->plan_exec_func = ri_SqlStringPlanExecute;
+	plan->plan_exec_arg = (void *) plancache_list;
+	plan->plan_is_valid_func = ri_SqlStringPlanIsValid;
+	plan->plan_free_func = ri_SqlStringPlanFree;
+
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+}
+
+/*
+ * This executes the plan after creating a CachedPlan for each
+ * CachedPlanSource found stored in plan->plan_exec_arg using given
+ * parameter values.
+ *
+ * Return value is the number of tuples returned by the "last" CachedPlan.
+ */
+static int
+ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+						Datum *param_vals, char *param_isnulls,
+						Snapshot test_snapshot,
+						Snapshot crosscheck_snapshot,
+						int limit, CmdType *last_stmt_cmdtype)
+{
+	List   *plancache_list = (List *) plan->plan_exec_arg;
+	ListCell   *lc;
+	CachedPlan *cplan;
+	ResourceOwner plan_owner;
+	int			tuples_processed = 0;	/* appease compiler */
+	ParamListInfo paramLI;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	Assert(list_length(plancache_list) > 0);
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = NULL;		/* will be filled below */
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/*
+	 * Convert the parameters into a format that the planner and the executor
+	 * expect them to be in.
+	 */
+	if (plan->nargs > 0)
+	{
+		paramLI = makeParamList(plan->nargs);
+
+		for (int i = 0; i < plan->nargs; i++)
+		{
+			ParamExternData *prm = &paramLI->params[i];
+
+			prm->value = param_vals[i];
+			prm->isnull = (param_isnulls && param_isnulls[i] == 'n');
+			prm->pflags = PARAM_FLAG_CONST;
+			prm->ptype = plan->paramtypes[i];
+		}
+	}
+	else
+		paramLI = NULL;
+
+	plan_owner = CurrentResourceOwner; /* XXX - why? */
+	foreach(lc, plancache_list)
+	{
+		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
+		List	   *stmt_list;
+		ListCell   *lc2;
+
+		ricallbackarg.query = plansource->query_string;
+
+		/*
+		 * Replan if needed, and increment plan refcount.  If it's a saved
+		 * plan, the refcount must be backed by the plan_owner.
+		 */
+		cplan = GetCachedPlan(plansource, paramLI, plan_owner, NULL);
+
+		stmt_list = cplan->stmt_list;
+
+		foreach(lc2, stmt_list)
+		{
+			PlannedStmt *stmt = lfirst_node(PlannedStmt, lc2);
+			DestReceiver *dest;
+			QueryDesc  *qdesc;
+			int			eflags;
+
+			*last_stmt_cmdtype = stmt->commandType;
+
+			/*
+			 * Advance the command counter before each command and update the
+			 * snapshot.
+			 */
+			CommandCounterIncrement();
+			UpdateActiveSnapshotCommandId();
+
+			dest = CreateDestReceiver(DestNone);
+			qdesc = CreateQueryDesc(stmt, plansource->query_string,
+									test_snapshot, crosscheck_snapshot,
+									dest, paramLI, NULL, 0);
+
+			/* Select execution options */
+			eflags = EXEC_FLAG_SKIP_TRIGGERS;
+			ExecutorStart(qdesc, eflags);
+			ExecutorRun(qdesc, ForwardScanDirection, limit, true);
+
+			/* We return the last executed statement's value. */
+			tuples_processed = qdesc->estate->es_processed;
+
+			ExecutorFinish(qdesc);
+			ExecutorEnd(qdesc);
+			FreeQueryDesc(qdesc);
+		}
+
+		/* Done with this plan, so release refcount */
+		ReleaseCachedPlan(cplan, CurrentResourceOwner);
+		cplan = NULL;
+	}
+
+	Assert(cplan == NULL);
+
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+
+	return tuples_processed;
+}
+
+/*
+ * Have any of the CachedPlanSources been invalidated since being created?
+ */
+static bool
+ri_SqlStringPlanIsValid(RI_Plan *plan)
+{
+	List   *plancache_list = (List *) plan->plan_exec_arg;
+	ListCell *lc;
+
+	foreach(lc, plancache_list)
+	{
+		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
+
+		if (!CachedPlanIsValid(plansource))
+			return false;
+	}
+	return true;
+}
+
+/* Release CachedPlanSources and associated CachedPlans if any.*/
+static void
+ri_SqlStringPlanFree(RI_Plan *plan)
+{
+	List   *plancache_list = (List *) plan->plan_exec_arg;
+	ListCell *lc;
+
+	foreach(lc, plancache_list)
+	{
+		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc);
+
+		DropCachedPlan(plansource);
+	}
+}
+
+/*
+ * Create an RI_Plan for a given RI check query and initialize the
+ * plan callbacks and execution argument using the caller specified
+ * function.
+ */
+static RI_Plan *
+ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
+			  const char *querystr, int nargs, Oid *paramtypes)
+{
+	RI_Plan	   *plan;
+	MemoryContext plancxt,
+				oldcxt;
+
+	/*
+	 * Create a memory context for the plan underneath CurrentMemoryContext,
+	 * which is reparented later to be underneath CacheMemoryContext;
+	 */
+	plancxt = AllocSetContextCreate(CurrentMemoryContext,
+									"RI Plan",
+									ALLOCSET_SMALL_SIZES);
+	oldcxt = MemoryContextSwitchTo(plancxt);
+	plan = (RI_Plan *) palloc0(sizeof(*plan));
+	plan->plancxt = plancxt;
+	plan->nargs = nargs;
+	if (plan->nargs > 0)
+	{
+		plan->paramtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
+		memcpy(plan->paramtypes, paramtypes, plan->nargs * sizeof(Oid));
+	}
+
+	plan_create_func(plan, querystr, nargs, paramtypes);
+
+	MemoryContextSetParent(plan->plancxt, CacheMemoryContext);
+	MemoryContextSwitchTo(oldcxt);
+
+	return plan;
+}
+
+/*
+ * Execute the plan by calling plan_exec_func().
+ *
+ * Returns the number of tuples obtained by executing the plan; the caller
+ * typically wants to checks if at least 1 row was returned.
+ *
+ * *last_stmt_cmdtype is set to the CmdType of the last operation performed
+ * by executing the plan, which may consist of more than 1 executable
+ * statements if, for example, any rules belonging to the tables mentioned in
+ * the original query added additional operations.
+ */
+static int
+ri_PlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel,
+			   Datum *param_vals, char *param_isnulls,
+			   Snapshot test_snapshot, Snapshot crosscheck_snapshot,
+			   int limit, CmdType *last_stmt_cmdtype)
+{
+	Assert(test_snapshot != NULL && ActiveSnapshotSet());
+	return plan->plan_exec_func(plan, fk_rel, pk_rel,
+								param_vals, param_isnulls,
+								test_snapshot,
+								crosscheck_snapshot,
+								limit, last_stmt_cmdtype);
+}
+
+/*
+ * Is the plan still valid to continue caching?
+ */
+static bool
+ri_PlanIsValid(RI_Plan *plan)
+{
+	return plan->plan_is_valid_func(plan);
+}
+
+/* Release plan resources. */
+static void
+ri_FreePlan(RI_Plan *plan)
+{
+	/* First call the implementation specific release function. */
+	plan->plan_free_func(plan);
+
+	/* Now get rid of the RI_plan and subsidiary data in its plancxt */
+	MemoryContextDelete(plan->plancxt);
+}
 
 /*
  * Prepare execution plan for a query to enforce an RI restriction
  */
-static SPIPlanPtr
-ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
+static RI_Plan *
+ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+			 const char *querystr, int nargs, Oid *argtypes,
 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
-	SPIPlanPtr	qplan;
+	RI_Plan	   *qplan;
 	Relation	query_rel;
 	Oid			save_userid;
 	int			save_sec_context;
@@ -2285,18 +2665,12 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
 						   SECURITY_NOFORCE_RLS);
-
 	/* Create the plan */
-	qplan = SPI_prepare(querystr, nargs, argtypes);
-
-	if (qplan == NULL)
-		elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr);
+	qplan = ri_PlanCreate(plan_create_func, querystr, nargs, argtypes);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
 
-	/* Save the plan */
-	SPI_keepplan(qplan);
 	ri_HashPreparedPlan(qkey, qplan);
 
 	return qplan;
@@ -2307,10 +2681,10 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
  */
 static bool
 ri_PerformCheck(const RI_ConstraintInfo *riinfo,
-				RI_QueryKey *qkey, SPIPlanPtr qplan,
+				RI_QueryKey *qkey, RI_Plan *qplan,
 				Relation fk_rel, Relation pk_rel,
 				TupleTableSlot *oldslot, TupleTableSlot *newslot,
-				bool detectNewRows, int expect_OK)
+				bool detectNewRows, int expected_cmdtype)
 {
 	Relation	query_rel,
 				source_rel;
@@ -2318,11 +2692,12 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 	Snapshot	test_snapshot;
 	Snapshot	crosscheck_snapshot;
 	int			limit;
-	int			spi_result;
+	int			tuples_processed;
 	Oid			save_userid;
 	int			save_sec_context;
 	Datum		vals[RI_MAX_NUMKEYS * 2];
 	char		nulls[RI_MAX_NUMKEYS * 2];
+	CmdType		last_stmt_cmdtype;
 
 	/*
 	 * Use the query type code to determine whether the query is run against
@@ -2373,30 +2748,36 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 	 * the caller passes detectNewRows == false then it's okay to do the query
 	 * with the transaction snapshot; otherwise we use a current snapshot, and
 	 * tell the executor to error out if it finds any rows under the current
-	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
-	 * that SPI_execute_snapshot will register the snapshots, so we don't need
-	 * to bother here.
+	 * snapshot that wouldn't be visible per the transaction snapshot.
+	 *
+	 * Also push the chosen snapshot so that anyplace that wants to use it
+	 * can get it by calling GetActiveSnapshot().
 	 */
 	if (IsolationUsesXactSnapshot() && detectNewRows)
 	{
-		CommandCounterIncrement();	/* be sure all my own work is visible */
 		test_snapshot = GetLatestSnapshot();
 		crosscheck_snapshot = GetTransactionSnapshot();
+		/* Make sure we have a private copy of the snapshot to modify. */
+		PushCopiedSnapshot(test_snapshot);
 	}
 	else
 	{
-		/* the default SPI behavior is okay */
-		test_snapshot = InvalidSnapshot;
+		test_snapshot = GetTransactionSnapshot();
 		crosscheck_snapshot = InvalidSnapshot;
+		PushActiveSnapshot(test_snapshot);
 	}
 
+	/* Also advance the command counter and update the snapshot. */
+	CommandCounterIncrement();
+	UpdateActiveSnapshotCommandId();
+
 	/*
 	 * If this is a select query (e.g., for a 'no action' or 'restrict'
 	 * trigger), we only need to see if there is a single row in the table,
 	 * matching the key.  Otherwise, limit = 0 - because we want the query to
 	 * affect ALL the matching rows.
 	 */
-	limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
+	limit = (expected_cmdtype == CMD_SELECT) ? 1 : 0;
 
 	/* Switch to proper UID to perform check as */
 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
@@ -2405,19 +2786,16 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 						   SECURITY_NOFORCE_RLS);
 
 	/* Finally we can run the query. */
-	spi_result = SPI_execute_snapshot(qplan,
-									  vals, nulls,
+	tuples_processed = ri_PlanExecute(qplan, fk_rel, pk_rel, vals, nulls,
 									  test_snapshot, crosscheck_snapshot,
-									  false, false, limit);
+									  limit, &last_stmt_cmdtype);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
 
-	/* Check result */
-	if (spi_result < 0)
-		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+	PopActiveSnapshot();
 
-	if (expect_OK >= 0 && spi_result != expect_OK)
+	if (last_stmt_cmdtype != expected_cmdtype)
 		ereport(ERROR,
 				(errcode(ERRCODE_INTERNAL_ERROR),
 				 errmsg("referential integrity query on \"%s\" from constraint \"%s\" on \"%s\" gave unexpected result",
@@ -2428,15 +2806,15 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 
 	/* XXX wouldn't it be clearer to do this part at the caller? */
 	if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
-		expect_OK == SPI_OK_SELECT &&
-		(SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
+		expected_cmdtype == CMD_SELECT &&
+		(tuples_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
 		ri_ReportViolation(riinfo,
 						   pk_rel, fk_rel,
 						   newslot ? newslot : oldslot,
 						   NULL,
 						   qkey->constr_queryno, false);
 
-	return SPI_processed != 0;
+	return tuples_processed != 0;
 }
 
 /*
@@ -2699,14 +3077,14 @@ ri_InitHashTables(void)
 /*
  * ri_FetchPreparedPlan -
  *
- * Lookup for a query key in our private hash table of prepared
- * and saved SPI execution plans. Return the plan if found or NULL.
+ * Lookup for a query key in our private hash table of saved RI plans.
+ * Return the plan if found or NULL.
  */
-static SPIPlanPtr
+static RI_Plan *
 ri_FetchPreparedPlan(RI_QueryKey *key)
 {
 	RI_QueryHashEntry *entry;
-	SPIPlanPtr	plan;
+	RI_Plan *plan;
 
 	/*
 	 * On the first call initialize the hashtable
@@ -2734,7 +3112,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key)
 	 * locked both FK and PK rels.
 	 */
 	plan = entry->plan;
-	if (plan && SPI_plan_is_valid(plan))
+	if (plan && ri_PlanIsValid(plan))
 		return plan;
 
 	/*
@@ -2743,7 +3121,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key)
 	 */
 	entry->plan = NULL;
 	if (plan)
-		SPI_freeplan(plan);
+		ri_FreePlan(plan);
 
 	return NULL;
 }
@@ -2755,7 +3133,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key)
  * Add another plan to our private SPI query plan hashtable.
  */
 static void
-ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan)
+ri_HashPreparedPlan(RI_QueryKey *key, RI_Plan *plan)
 {
 	RI_QueryHashEntry *entry;
 	bool		found;
-- 
2.35.3

From 0d8fc5f14da3fbd0234db46616cae3752dc919a5 Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Tue, 12 Jan 2021 14:17:31 +0900
Subject: [PATCH v6 2/4] Avoid using an SQL query for some RI checks

For RI triggers that want to check if a given referenced value exists
in the referenced relation, it suffices to simply scan the foreign key
constraint's unique index, instead of issuing an SQL query to do the
same thing.

To do so, this commit builds on the RIPlan infrastructure added in the
previous commit.  It replaces ri_SqlStringPlanCreate() used in
RI_FKey_check() and ri_Check_Pk_Match() for creating the plan for their
respective checks by ri_LookupKeyInPkRelPlanCreate(), which installs
ri_LookupKeyInPkRel() as the plan to implement those checks.
ri_LookupKeyInPkRel() contains the logic to directly scan the unique
key associated with the foreign key constraint.
---
 src/backend/executor/execPartition.c | 167 +++++++++-
 src/backend/executor/nodeLockRows.c  | 160 +++++-----
 src/backend/utils/adt/ri_triggers.c  | 448 +++++++++++++++++++++------
 src/include/executor/execPartition.h |   6 +
 src/include/executor/executor.h      |   9 +
 5 files changed, 611 insertions(+), 179 deletions(-)

diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 40e3c07693..764f2b9f8a 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -176,8 +176,9 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
 								  EState *estate,
 								  Datum *values,
 								  bool *isnull);
-static int	get_partition_for_tuple(PartitionDispatch pd, Datum *values,
-									bool *isnull);
+static int get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
 												  Datum *values,
 												  bool *isnull,
@@ -318,7 +319,9 @@ ExecFindPartition(ModifyTableState *mtstate,
 		 * these values, error out.
 		 */
 		if (partdesc->nparts == 0 ||
-			(partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
+			(partidx = get_partition_for_tuple(dispatch->key,
+											   dispatch->partdesc,
+											   values, isnull)) < 0)
 		{
 			char	   *val_desc;
 
@@ -1379,12 +1382,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * found or -1 if none found.
  */
 static int
-get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull)
 {
 	int			bound_offset = -1;
 	int			part_index = -1;
-	PartitionKey key = pd->key;
-	PartitionDesc partdesc = pd->partdesc;
 	PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
 	/*
@@ -1591,6 +1594,158 @@ get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
 	return part_index;
 }
 
+/*
+ * ExecGetLeafPartitionForKey
+ *		Finds the leaf partition of a partitioned table 'root_rel' that might
+ *		contain the specified primary key tuple containing a subset of the
+ *		table's columns (including all of the partition key columns)
+ *
+ * 'key_natts' specifies the number columns contained in the key,
+ * 'key_attnums' their attribute numbers as defined in 'root_rel', and
+ * 'key_vals' and 'key_nulls' specify the key tuple.
+ *
+ * Any intermediate parent tables encountered on the way to finding the leaf
+ * partition are locked using 'lockmode' when opening.
+ *
+ * Returns NULL if no leaf partition is found for the key.
+ *
+ * This also finds the index in thus found leaf partition that is recorded as
+ * descending from 'root_idxoid' and returns it in '*leaf_idxoid'.
+ *
+ * Caller must close the returned relation, if any.
+ *
+ * This works because the unique key defined on the root relation is required
+ * to contain the partition key columns of all of the ancestors that lead up to
+ * a given leaf partition.
+ */
+Relation
+ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
+						   const AttrNumber *key_attnums,
+						   Datum *key_vals, char *key_nulls,
+						   Oid root_idxoid, int lockmode,
+						   Oid *leaf_idxoid)
+{
+	Relation	rel = root_rel;
+	Oid			constr_idxoid = root_idxoid;
+
+	*leaf_idxoid = InvalidOid;
+
+	/*
+	 * Descend through partitioned parents to find the leaf partition that
+	 * would accept a row with the provided key values, starting with the root
+	 * parent.
+	 */
+	while (true)
+	{
+		PartitionKey partkey = RelationGetPartitionKey(rel);
+		PartitionDirectory partdir;
+		PartitionDesc partdesc;
+		Datum	partkey_vals[PARTITION_MAX_KEYS];
+		bool	partkey_isnull[PARTITION_MAX_KEYS];
+		AttrNumber *root_partattrs = partkey->partattrs;
+		int		i,
+				j;
+		int		partidx;
+		Oid		partoid;
+		bool	is_leaf;
+
+		/*
+		 * Collect partition key values from the unique key.
+		 *
+		 * Because we only have the root table's copy of pk_attnums, must map
+		 * any non-root table's partition key attribute numbers to the root
+		 * table's.
+		 */
+		if (rel != root_rel)
+		{
+			/*
+			 * map->attnums will contain root table attribute numbers for each
+			 * attribute of the current partitioned relation.
+			 */
+			AttrMap *map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
+														RelationGetDescr(rel));
+
+			if (map)
+			{
+				root_partattrs = palloc(partkey->partnatts *
+										sizeof(AttrNumber));
+				for (i = 0; i < partkey->partnatts; i++)
+				{
+					AttrNumber	partattno = partkey->partattrs[i];
+
+					root_partattrs[i] = map->attnums[partattno - 1];
+				}
+
+				free_attrmap(map);
+			}
+		}
+
+		/*
+		 * Referenced key specification does not allow expressions, so there
+		 * would not be expressions in the partition keys either.
+		 */
+		Assert(partkey->partexprs == NIL);
+		for (i = 0, j = 0; i < partkey->partnatts; i++)
+		{
+			int		k;
+
+			for (k = 0; k < key_natts; k++)
+			{
+				if (root_partattrs[i] == key_attnums[k])
+				{
+					partkey_vals[j] = key_vals[k];
+					partkey_isnull[j] = (key_nulls[k] == 'n');
+					j++;
+					break;
+				}
+			}
+		}
+		/* Had better have found values for all of the partition keys. */
+		Assert(j == partkey->partnatts);
+
+		if (root_partattrs != partkey->partattrs)
+			pfree(root_partattrs);
+
+		/* Get the PartitionDesc using the partition directory machinery.  */
+		partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
+		partdesc = PartitionDirectoryLookup(partdir, rel);
+
+		/* Find the partition for the key. */
+		partidx = get_partition_for_tuple(partkey, partdesc, partkey_vals,
+										  partkey_isnull);
+		Assert(partidx < 0 || partidx < partdesc->nparts);
+
+		/* Done using the partition directory. */
+		DestroyPartitionDirectory(partdir);
+
+		/* Close any intermediate parents we opened, but keep the lock. */
+		if (rel != root_rel)
+			table_close(rel, NoLock);
+
+		/* No partition found. */
+		if (partidx < 0)
+			return NULL;
+
+		partoid = partdesc->oids[partidx];
+		rel = table_open(partoid, lockmode);
+		constr_idxoid = index_get_partition(rel, constr_idxoid);
+
+		/*
+		 * Return if the partition is a leaf, else find its partition in the
+		 * next iteration.
+		 */
+		is_leaf = partdesc->is_leaf[partidx];
+		if (is_leaf)
+		{
+			*leaf_idxoid = constr_idxoid;
+			return rel;
+		}
+	}
+
+	Assert(false);
+	return NULL;
+}
+
 /*
  * ExecBuildSlotPartitionKeyDescription
  *
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index a74813c7aa..352cacd70b 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -79,10 +79,7 @@ lnext:
 		Datum		datum;
 		bool		isNull;
 		ItemPointerData tid;
-		TM_FailureData tmfd;
 		LockTupleMode lockmode;
-		int			lockflags = 0;
-		TM_Result	test;
 		TupleTableSlot *markSlot;
 
 		/* clear any leftover test tuple for this rel */
@@ -179,74 +176,11 @@ lnext:
 				break;
 		}
 
-		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-		if (!IsolationUsesXactSnapshot())
-			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-								markSlot, estate->es_output_cid,
-								lockmode, erm->waitPolicy,
-								lockflags,
-								&tmfd);
-
-		switch (test)
-		{
-			case TM_WouldBlock:
-				/* couldn't lock tuple in SKIP LOCKED mode */
-				goto lnext;
-
-			case TM_SelfModified:
-
-				/*
-				 * The target tuple was already updated or deleted by the
-				 * current command, or by a later command in the current
-				 * transaction.  We *must* ignore the tuple in the former
-				 * case, so as to avoid the "Halloween problem" of repeated
-				 * update attempts.  In the latter case it might be sensible
-				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_update and heap_delete to not
-				 * complain about updating "invisible" tuples, which seems
-				 * pretty scary (table_tuple_lock will not complain, but few
-				 * callers expect TM_Invisible, and we're not one of them). So
-				 * for now, treat the tuple as deleted and do not process.
-				 */
-				goto lnext;
-
-			case TM_Ok:
-
-				/*
-				 * Got the lock successfully, the locked tuple saved in
-				 * markSlot for, if needed, EvalPlanQual testing below.
-				 */
-				if (tmfd.traversed)
-					epq_needed = true;
-				break;
-
-			case TM_Updated:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				elog(ERROR, "unexpected table_tuple_lock status: %u",
-					 test);
-				break;
-
-			case TM_Deleted:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				/* tuple was deleted so don't return it */
-				goto lnext;
-
-			case TM_Invisible:
-				elog(ERROR, "attempted to lock invisible tuple");
-				break;
-
-			default:
-				elog(ERROR, "unrecognized table_tuple_lock status: %u",
-					 test);
-		}
+		/* skip tuple if it couldn't be locked */
+		if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
+								estate->es_snapshot, estate->es_output_cid,
+								lockmode, erm->waitPolicy, &epq_needed))
+			goto lnext;
 
 		/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
 		erm->curCtid = tid;
@@ -281,6 +215,90 @@ lnext:
 	return slot;
 }
 
+/*
+ * ExecLockTableTuple
+ * 		Locks tuple with the specified TID in lockmode following given wait
+ * 		policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *epq_needed)
+{
+	TM_FailureData tmfd;
+	int			lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+	TM_Result	test;
+
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+							waitPolicy, lockflags, &tmfd);
+
+	switch (test)
+	{
+		case TM_WouldBlock:
+			/* couldn't lock tuple in SKIP LOCKED mode */
+			return false;
+
+		case TM_SelfModified:
+			/*
+			 * The target tuple was already updated or deleted by the
+			 * current command, or by a later command in the current
+			 * transaction.  We *must* ignore the tuple in the former
+			 * case, so as to avoid the "Halloween problem" of repeated
+			 * update attempts.  In the latter case it might be sensible
+			 * to fetch the updated tuple instead, but doing so would
+			 * require changing heap_update and heap_delete to not
+			 * complain about updating "invisible" tuples, which seems
+			 * pretty scary (table_tuple_lock will not complain, but few
+			 * callers expect TM_Invisible, and we're not one of them). So
+			 * for now, treat the tuple as deleted and do not process.
+			 */
+			return false;
+
+		case TM_Ok:
+			/*
+			 * Got the lock successfully, the locked tuple saved in
+			 * slot for EvalPlanQual, if asked by the caller.
+			 */
+			if (tmfd.traversed && epq_needed)
+				*epq_needed = true;
+			break;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			elog(ERROR, "unexpected table_tuple_lock status: %u",
+				 test);
+			break;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			/* tuple was deleted so don't return it */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			return false;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+			return false;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index cfebd9c4f2..9c52e765fe 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -23,22 +23,27 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/partition.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
+#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
+#include "partitioning/partdesc.h"
 #include "storage/bufmgr.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
@@ -50,6 +55,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/ruleutils.h"
@@ -151,6 +157,12 @@ typedef void (*RI_PlanFreeFunc_type) (struct RI_Plan *plan);
  */
 typedef struct RI_Plan
 {
+	/* Constraint for this plan. */
+	const RI_ConstraintInfo *riinfo;
+
+	/* RI query type code. */
+	int				constr_queryno;
+
 	/*
 	 * Context under which this struct and its subsidiary data gets allocated.
 	 * It is made a child of CacheMemoryContext.
@@ -265,7 +277,8 @@ static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
 													   Relation trig_rel, bool rel_is_pk);
 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
 static Oid	get_ri_constraint_root(Oid constrOid);
-static RI_Plan *ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+static RI_Plan *ri_PlanCheck(const RI_ConstraintInfo *riinfo,
+							 RI_PlanCreateFunc_type plan_create_func,
 							 const char *querystr, int nargs, Oid *argtypes,
 							 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
@@ -289,6 +302,15 @@ static int ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_r
 						Snapshot crosscheck_snapshot,
 						int limit, CmdType *last_stmt_cmdtype);
 static void ri_SqlStringPlanFree(RI_Plan *plan);
+static void ri_LookupKeyInPkRelPlanCreate(RI_Plan *plan,
+							  const char *querystr, int nargs, Oid *paramtypes);
+static int ri_LookupKeyInPkRel(struct RI_Plan *plan,
+					Relation fk_rel, Relation pk_rel,
+					Datum *pk_vals, char *pk_nulls,
+					Snapshot test_snapshot, Snapshot crosscheck_snapshot,
+					int limit, CmdType *last_stmt_cmdtype);
+static bool ri_LookupKeyInPkRelPlanIsValid(RI_Plan *plan);
+static void ri_LookupKeyInPkRelPlanFree(RI_Plan *plan);
 
 
 /*
@@ -384,9 +406,9 @@ RI_FKey_check(TriggerData *trigdata)
 
 					/*
 					 * MATCH PARTIAL - all non-null columns must match. (not
-					 * implemented, can be done by modifying the query below
-					 * to only include non-null columns, or by writing a
-					 * special version here)
+					 * implemented, can be done by modifying
+					 * LookupKeyInPkRelPlanExecute() to only include non-null
+					 * columns.
 					 */
 					break;
 #endif
@@ -406,49 +428,9 @@ RI_FKey_check(TriggerData *trigdata)
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *pk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * corresponding FK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-						 pk_only, pkrelname);
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pf_eq_oprs[i],
-							paramname, fk_type);
-			querysep = "AND";
-			queryoids[i] = fk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
-							 querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_LookupKeyInPkRelPlanCreate(). */
+		qplan = ri_PlanCheck(riinfo, ri_LookupKeyInPkRelPlanCreate,
+							 NULL, 0 /* nargs */, NULL /* argtypes */,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -533,48 +515,9 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		const char *pk_only;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * PK attributes themselves.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-						 pk_only, pkrelname);
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pp_eq_oprs[i],
-							paramname, pk_type);
-			querysep = "AND";
-			queryoids[i] = pk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
-							 querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_LookupKeyInPkRelPlanCreate(). */
+		qplan = ri_PlanCheck(riinfo, ri_LookupKeyInPkRelPlanCreate,
+							 NULL, 0 /* nargs */, NULL /* argtypes */,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -760,7 +703,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -860,7 +803,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 		}
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -977,7 +920,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys * 2, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -1204,7 +1147,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 		}
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -2013,6 +1956,11 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * saving lots of work and memory when there are many partitions with
 	 * similar FK constraints.
 	 *
+	 * We must not share the plan for RI_PLAN_CHECK_LOOKUPPK queries either,
+	 * because its execution function (ri_LookupKeyInPkRel()) expects to see
+	 * the RI_ConstraintInfo of the individual leaf partitions that the
+	 * query fired on.
+	 *
 	 * (Note that we must still have a separate RI_ConstraintInfo for each
 	 * constraint, because partitions can have different column orders,
 	 * resulting in different pk_attnums[] or fk_attnums[] array contents.)
@@ -2020,7 +1968,8 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
 	 * to use memset to clear them.
 	 */
-	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
+		constr_queryno != RI_PLAN_CHECK_LOOKUPPK)
 		key->constr_id = riinfo->constraint_root_id;
 	else
 		key->constr_id = riinfo->constraint_id;
@@ -2285,10 +2234,17 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
 	}
 }
 
+typedef enum RI_Plantype
+{
+	RI_PLAN_SQL = 0,
+	RI_PLAN_CHECK_FUNCTION
+} RI_Plantype;
+
 /* Query string or an equivalent name to show in the error CONTEXT. */
 typedef struct RIErrorCallbackArg
 {
 	const char *query;
+	RI_Plantype plantype;
 } RIErrorCallbackArg;
 
 /*
@@ -2318,7 +2274,17 @@ _RI_error_callback(void *arg)
 		internalerrquery(query);
 	}
 	else
-		errcontext("SQL statement \"%s\"", query);
+	{
+		switch (carg->plantype)
+		{
+			case RI_PLAN_SQL:
+				errcontext("SQL statement \"%s\"", query);
+				break;
+			case RI_PLAN_CHECK_FUNCTION:
+				errcontext("RI check function \"%s\"", query);
+				break;
+		}
+	}
 }
 
 /*
@@ -2555,14 +2521,277 @@ ri_SqlStringPlanFree(RI_Plan *plan)
 	}
 }
 
+/*
+ * Creates an RI_Plan to look a key up in the PK table.
+ *
+ * Not much to do beside initializing the expected callback members, because
+ * there is no query string to parse and plan.
+ */
+static void
+ri_LookupKeyInPkRelPlanCreate(RI_Plan *plan,
+							  const char *querystr, int nargs, Oid *paramtypes)
+{
+	Assert(querystr == NULL);
+	plan->plan_exec_func = ri_LookupKeyInPkRel;
+	plan->plan_exec_arg = NULL;
+	plan->plan_is_valid_func = ri_LookupKeyInPkRelPlanIsValid;
+	plan->plan_free_func = ri_LookupKeyInPkRelPlanFree;
+}
+
+/*
+ * get_fkey_unique_index
+ * 		Returns the unique index used by a supposedly foreign key constraint
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+	Oid			result = InvalidOid;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+	if (HeapTupleIsValid(tp))
+	{
+		Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+		if (contup->contype == CONSTRAINT_FOREIGN)
+			result = contup->conindid;
+		ReleaseSysCache(tp);
+	}
+
+	if (!OidIsValid(result))
+		elog(ERROR, "unique index not found for foreign key constraint %u",
+			 conoid);
+
+	return result;
+}
+
+/*
+ * Checks whether a tuple containing the given unique key given by pk_vals,
+ * pk_nulls exists in 'pk_rel'.  The key is looked up using the constraint's
+ * index given in plan->riinfo.
+ *
+ * If 'pk_rel' is a partitioned table, the check is performed on its leaf
+ * partition that would contain the key.
+ *
+ * The provided tuple is either the one being inserted into the referencing
+ * relation (fk_rel) or the one being deleted from the referenced relation
+ * (pk_rel).
+ */
+static int
+ri_LookupKeyInPkRel(struct RI_Plan *plan,
+					Relation fk_rel, Relation pk_rel,
+					Datum *pk_vals, char *pk_nulls,
+					Snapshot test_snapshot, Snapshot crosscheck_snapshot,
+					int limit, CmdType *last_stmt_cmdtype)
+{
+	const RI_ConstraintInfo *riinfo = plan->riinfo;
+	Oid			constr_id = riinfo->constraint_id;
+	Oid			idxoid;
+	Relation	idxrel;
+	Relation	leaf_pk_rel = NULL;
+	int			num_pk;
+	int			i;
+	int			tuples_processed = 0;
+	const Oid  *eq_oprs;
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	IndexScanDesc	scan;
+	TupleTableSlot *outslot;
+	AclResult	aclresult;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	/* We're effectively doing a CMD_SELECT below. */
+	*last_stmt_cmdtype = CMD_SELECT;
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = pstrdup("ri_LookupKeyInPkRel");
+	ricallbackarg.plantype = RI_PLAN_CHECK_FUNCTION;
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/* XXX Maybe afterTriggerInvokeEvents() / AfterTriggerExecute() should? */
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * Choose the equality operators to use when scanning the PK index below.
+	 */
+	if (plan->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
+	{
+		/* Use PK = FK equality operator. */
+		eq_oprs = riinfo->pf_eq_oprs;
+
+		/*
+		 * May need to cast each of the individual values of the foreign key
+		 * to the corresponding PK column's type if the equality operator
+		 * demands it.
+		 */
+		for (i = 0; i < riinfo->nkeys; i++)
+		{
+			if (pk_nulls[i] != 'n')
+			{
+				Oid		eq_opr = eq_oprs[i];
+				Oid		typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+				RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+				if (OidIsValid(entry->cast_func_finfo.fn_oid))
+					pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+											   pk_vals[i],
+											   Int32GetDatum(-1), /* typmod */
+											   BoolGetDatum(false)); /* implicit coercion */
+			}
+		}
+	}
+	else
+	{
+		Assert(plan->constr_queryno == RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
+		/* Use PK = PK equality operator. */
+		eq_oprs = riinfo->pp_eq_oprs;
+	}
+
+	/*
+	 * Must explicitly check that the new user has permissions to look into the
+	 * schema of and SELECT from the referenced table.
+	 */
+	aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
+									  GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(pk_rel)));
+	aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(pk_rel));
+
+	/*
+	 * Open the constraint index to be scanned.
+	 *
+	 * If the target table is partitioned, we must look up the leaf partition
+	 * and its corresponding unique index to search the keys in.
+	 */
+	idxoid = get_fkey_unique_index(constr_id);
+	if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Oid		leaf_idxoid;
+
+		/*
+		 * Note that this relies on the latest snapshot having been pushed by
+		 * the caller to be the ActiveSnapshot.  The PartitionDesc machinery
+		 * that runs as part of this will need to use the snapshot to determine
+		 * whether to omit or include any detach-pending partition based on the
+		 * whether the pg_inherits row that marks it as detach-pending is
+		 * is visible to it or not, respectively.
+		 */
+		leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
+												 riinfo->pk_attnums,
+												 pk_vals, pk_nulls,
+												 idxoid, RowShareLock,
+												 &leaf_idxoid);
+
+		/*
+		 * If no suitable leaf partition exists, neither can the key we're
+		 * looking for.
+		 */
+		if (leaf_pk_rel == NULL)
+			goto done;
+
+		pk_rel = leaf_pk_rel;
+		idxoid = leaf_idxoid;
+	}
+	idxrel = index_open(idxoid, RowShareLock);
+
+	/* Set up ScanKeys for the index scan. */
+	num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+	for (i = 0; i < num_pk; i++)
+	{
+		int			pkattno = i + 1;
+		Oid			operator = eq_oprs[i];
+		Oid			opfamily = idxrel->rd_opfamily[i];
+		StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
+		RegProcedure regop = get_opcode(operator);
+
+		/* Initialize the scankey. */
+		ScanKeyInit(&skey[i],
+					pkattno,
+					strat,
+					regop,
+					pk_vals[i]);
+
+		skey[i].sk_collation = idxrel->rd_indcollation[i];
+
+		/*
+		 * Check for null value.  Should not occur, because callers currently
+		 * take care of the cases in which they do occur.
+		 */
+		if (pk_nulls[i] == 'n')
+			skey[i].sk_flags |= SK_ISNULL;
+	}
+
+	scan = index_beginscan(pk_rel, idxrel, test_snapshot, num_pk, 0);
+	index_rescan(scan, skey, num_pk, NULL, 0);
+
+	/* Look for the tuple, and if found, try to lock it in key share mode. */
+	outslot = table_slot_create(pk_rel, NULL);
+	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	{
+		/*
+		 * If we fail to lock the tuple for whatever reason, assume it doesn't
+		 * exist.
+		 */
+		if (ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+							   test_snapshot,
+							   GetCurrentCommandId(false),
+							   LockTupleKeyShare,
+							   LockWaitBlock, NULL))
+			tuples_processed = 1;
+	}
+
+	index_endscan(scan);
+	ExecDropSingleTupleTableSlot(outslot);
+
+	/* Don't release lock until commit. */
+	index_close(idxrel, NoLock);
+
+	/* Close leaf partition relation if any. */
+	if (leaf_pk_rel)
+		table_close(leaf_pk_rel, NoLock);
+
+done:
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+
+	return tuples_processed;
+}
+
+static bool
+ri_LookupKeyInPkRelPlanIsValid(RI_Plan *plan)
+{
+	/* Never store anything that can be invalidated. */
+	return true;
+}
+
+static void
+ri_LookupKeyInPkRelPlanFree(RI_Plan *plan)
+{
+	/* Nothing to free. */
+}
+
 /*
  * Create an RI_Plan for a given RI check query and initialize the
  * plan callbacks and execution argument using the caller specified
  * function.
  */
 static RI_Plan *
-ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
-			  const char *querystr, int nargs, Oid *paramtypes)
+ri_PlanCreate(const RI_ConstraintInfo *riinfo,
+			  RI_PlanCreateFunc_type plan_create_func,
+			  const char *querystr, int nargs, Oid *paramtypes,
+			  int constr_queryno)
 {
 	RI_Plan	   *plan;
 	MemoryContext plancxt,
@@ -2577,6 +2806,8 @@ ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
 									ALLOCSET_SMALL_SIZES);
 	oldcxt = MemoryContextSwitchTo(plancxt);
 	plan = (RI_Plan *) palloc0(sizeof(*plan));
+	plan->riinfo = riinfo;
+	plan->constr_queryno = constr_queryno;
 	plan->plancxt = plancxt;
 	plan->nargs = nargs;
 	if (plan->nargs > 0)
@@ -2642,7 +2873,8 @@ ri_FreePlan(RI_Plan *plan)
  * Prepare execution plan for a query to enforce an RI restriction
  */
 static RI_Plan *
-ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+ri_PlanCheck(const RI_ConstraintInfo *riinfo,
+			 RI_PlanCreateFunc_type plan_create_func,
 			 const char *querystr, int nargs, Oid *argtypes,
 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
@@ -2666,7 +2898,8 @@ ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
 						   SECURITY_NOFORCE_RLS);
 	/* Create the plan */
-	qplan = ri_PlanCreate(plan_create_func, querystr, nargs, argtypes);
+	qplan = ri_PlanCreate(riinfo, plan_create_func, querystr, nargs,
+						  argtypes, qkey->constr_queryno);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
@@ -3277,7 +3510,10 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.
+ * if not.  The entry contains the FmgrInfo of the equality operator function
+ * and that of the cast function, if one is needed to convert the right
+ * operand (whose type OID has been passed) before passing it to the equality
+ * function.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -3333,8 +3569,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the values that will be passed to the
+		 * operator will be of expected operand type(s).  The operator can be
+		 * cross-type (such as when called by ri_LookupKeyInPkRel()), in which
+		 * case, we only need the cast if the right operand value doesn't match
+		 * the type expected by the operator.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 708435e952..cbe1d996e6 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -31,6 +31,12 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
 										EState *estate);
 extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
 									PartitionTupleRouting *proute);
+extern Relation ExecGetLeafPartitionForKey(Relation root_rel,
+										   int key_natts,
+										   const AttrNumber *key_attnums,
+										   Datum *key_vals, char *key_nulls,
+										   Oid root_idxoid, int lockmode,
+										   Oid *leaf_idxoid);
 
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index ed95ed1176..2f415b80ce 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -243,6 +243,15 @@ extern void ExecShutdownNode(PlanState *node);
 extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
+/*
+ * functions in execLockRows.c
+ */
+
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *epq_needed);
+
 /* ----------------------------------------------------------------
  *		ExecProcNode
  *
-- 
2.35.3

Reply via email to