From efee2160e8ea4718385f630eef2444a899c288bb Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Fri, 16 Apr 2021 15:08:47 +0530
Subject: [PATCH v5] Avoid Catalogue Accesses In conversion_error_callback

Avoid accessing system catalogs inside conversion_error_callback
error context callback, because the entire transaction might
have been broken at this point.

In case of foreign joins, the conversion_error_callback does
system catalog lookups via get_attname() and get_rel_name().
So store the required attribute and relation information in
PgFdwScanState of ForeignScanState before beginning the foreign
scan. This is one time job, so no extra performance hit. The
stored information will be used in conversion_error_callback.
---
 contrib/postgres_fdw/postgres_fdw.c | 167 ++++++++++++++++++++++++++--
 1 file changed, 157 insertions(+), 10 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c590f374c6..df794485ef 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -171,6 +171,12 @@ typedef struct PgFdwScanState
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
 	int			fetch_size;		/* number of tuples per fetch */
+
+	/*
+	 * Hash Table to store information that is used to identify the attribute
+	 * when data conversion fails in foreign joins.
+	 */
+	HTAB 		*fj_htab;
 } PgFdwScanState;
 
 /*
@@ -297,6 +303,18 @@ typedef struct
 	int64		offset_est;
 } PgFdwPathExtraData;
 
+/*
+ * Information used to identify the attribute when data conversion fails in
+ * foreign joins.
+ */
+typedef struct ForeignJoinRelInfo
+{
+	Oid		relid;		/* relid of the foreign table */
+	char    *relname;	/* name of the foreign table */
+	List    *attnos;	/* list of attribute numbers */
+	List    *attnames;	/* list of foreign table attribute names */
+} ForeignJoinRelInfo;
+
 /*
  * Identify the attribute where data conversion fails.
  */
@@ -547,7 +565,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_o,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int get_batch_size_option(Relation rel);
-
+static void fill_foreign_join_att_info(ForeignScanState *node);
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -1440,6 +1458,107 @@ postgresGetForeignPlan(PlannerInfo *root,
 							outer_plan);
 }
 
+/*
+ * Capture foreign table and attribute info into a hash table in case of
+ * foreign joins which will be used in conversion error callback avoiding
+ * system catalog lookups.
+ */
+static void
+fill_foreign_join_att_info(ForeignScanState *node)
+{
+	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+	EState	   *estate = node->ss.ps.state;
+	PgFdwScanState *fsstate;
+	ListCell   *lc;
+
+	fsstate = (PgFdwScanState *) node->fdw_state;
+
+	/*
+	 * For base foreign relations, the required information can be fetched
+	 * without system catalog lookups, so no need of doing this extra work,
+	 * just return.
+	 */
+	if (fsplan->scan.scanrelid > 0 && fsstate->rel)
+		return;
+
+	/* If there are no foreign relation attributes retrieved, just return. */
+	if (!fsstate->retrieved_attrs)
+		return;
+
+	Assert(fsplan->scan.scanrelid == 0 && !fsstate->rel &&
+		   list_length(fsstate->retrieved_attrs) > 0);
+
+	/* Capture the attribute info in case of foreign joins. */
+	foreach(lc, fsstate->retrieved_attrs)
+	{
+		TargetEntry *tle;
+		int	i;
+
+		i = lfirst_int(lc);
+		tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, i - 1);
+
+		if (IsA(tle->expr, Var))
+		{
+			RangeTblEntry *rte;
+			Var		*var = (Var *) tle->expr;
+			char 	*attname;
+			char    *relname;
+			ForeignJoinRelInfo *fj_rel_info;
+			bool	found;
+
+			rte = exec_rt_fetch(var->varno, estate);
+
+			/*
+			 * Even if var->varattno is 0/invalid, we add attname as NULL to
+			 * the list, so that in the error callback we could detect the
+			 * whole row with it.
+			 */
+			if (AttributeNumberIsValid(var->varattno))
+				attname = get_attname(rte->relid, var->varattno, false);
+			else
+				attname = NULL;
+
+			relname = get_rel_name(rte->relid);
+
+			/*
+			 * First time through, initialize the hashtable for storing
+			 * attribute info in case of foreign joins.
+			 */
+			if (!fsstate->fj_htab)
+			{
+				HASHCTL		hctl;
+
+				memset(&hctl, 0, sizeof(HASHCTL));
+				hctl.keysize = sizeof(Oid);
+				hctl.entrysize = sizeof(ForeignJoinRelInfo);
+				hctl.hcxt = CurrentMemoryContext;
+
+				fsstate->fj_htab = hash_create("Attribute Info for Foreign Table Joins",
+												32,	/* start small and extend */
+												&hctl,
+												HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+			}
+
+			/* Find or create cached entry for the foreign table */
+			fj_rel_info = hash_search(fsstate->fj_htab,
+									  &rte->relid,
+									  HASH_ENTER,
+									  &found);
+
+			if (!found)
+			{
+				fj_rel_info->relid = rte->relid;
+				fj_rel_info->relname = relname;
+				fj_rel_info->attnames = NULL;
+				fj_rel_info->attnos = NULL;
+			}
+
+			fj_rel_info->attnames = lappend(fj_rel_info->attnames, makeString(attname));
+			fj_rel_info->attnos = lappend_int(fj_rel_info->attnos, i);
+		}
+	}
+}
+
 /*
  * postgresBeginForeignScan
  *		Initiate an executor scan of a foreign PostgreSQL table.
@@ -1544,6 +1663,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 
 	/* Set the async-capable flag */
 	fsstate->async_capable = node->ss.ps.plan->async_capable;
+
+	/* Capture the attribute info in case of foreign joins. */
+	fill_foreign_join_att_info(node);
 }
 
 /*
@@ -7135,7 +7257,6 @@ conversion_error_callback(void *arg)
 		/* error occurred in a scan against a foreign join */
 		ForeignScanState *fsstate = errpos->fsstate;
 		ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
-		EState	   *estate = fsstate->ss.ps.state;
 		TargetEntry *tle;
 
 		tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
@@ -7148,17 +7269,43 @@ conversion_error_callback(void *arg)
 		 */
 		if (IsA(tle->expr, Var))
 		{
-			RangeTblEntry *rte;
-			Var		   *var = (Var *) tle->expr;
-
+			RangeTblEntry	*rte;
+			Var		*var;
+			EState	   *estate;
+			ListCell	*lc1;
+			ListCell 	*lc2;
+			ForeignJoinRelInfo *fj_rel_info;
+			bool	found;
+			PgFdwScanState	*fdwstate;
+
+			fdwstate = (PgFdwScanState *) fsstate->fdw_state;
+			estate = fsstate->ss.ps.state;
+			var = (Var *) tle->expr;
 			rte = exec_rt_fetch(var->varno, estate);
 
-			if (var->varattno == 0)
-				is_wholerow = true;
-			else
-				attname = get_attname(rte->relid, var->varattno, false);
+			Assert(fdwstate && fdwstate->fj_htab);
 
-			relname = get_rel_name(rte->relid);
+			fj_rel_info = (ForeignJoinRelInfo *) hash_search(fdwstate->fj_htab,
+															 (void *) &rte->relid,
+															 HASH_FIND,
+															 &found);
+
+			Assert(found && fj_rel_info && fj_rel_info->relid == rte->relid);
+
+			relname = fj_rel_info->relname;
+
+			forboth(lc1, fj_rel_info->attnos, lc2, fj_rel_info->attnames)
+			{
+				if (lfirst_int(lc1) == errpos->cur_attno)
+				{
+					attname = strVal(lfirst(lc2));
+
+					if (!attname)
+						is_wholerow = true;
+
+					break;
+				}
+			}
 		}
 		else
 			errcontext("processing expression at position %d in select list",
-- 
2.25.1

