From d97da108bd1bc8d868740553002df65fa43ac565 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 26 May 2023 14:16:50 +0530
Subject: [PATCH v1] pg_get_indexdef modification to use TxnSnapshot.

Change pg_get_indexdef() to use systable_beginscan() based on
transaction-snapshot rather than using SearchSysCache(). The
motivation is to fix the issue mentioned atop pg_dump.c
---
 src/backend/utils/adt/ruleutils.c | 168 ++++++++++++++++++++++++++++--
 1 file changed, 160 insertions(+), 8 deletions(-)

diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index d3a973d86b..cd9687084b 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -61,6 +61,7 @@
 #include "rewrite/rewriteSupport.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/datum.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -518,6 +519,11 @@ static char *generate_qualified_type_name(Oid typid);
 static text *string_to_text(char *str);
 static char *flatten_reloptions(Oid relid);
 static void get_reloptions(StringInfo buf, Datum reloptions);
+static Datum get_attoptions_using_snapshot(Oid relid, int16 attnum,
+										   Snapshot snapshot);
+static char *flatten_reloptions_using_snapshot(Oid relid, Snapshot snapshot);
+
+
 
 #define only_marker(rte)  ((rte)->inh ? "" : "ONLY ")
 
@@ -1267,17 +1273,45 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 	StringInfoData buf;
 	char	   *str;
 	char	   *sep;
+	SysScanDesc scandesc;
+	SysScanDesc scandescRel;
+	SysScanDesc scandescAm;
+	ScanKeyData scankey[1],
+				scankeyRel[1],
+				scankeyAm[1];
+
+	Snapshot	snapshot = RegisterSnapshot(GetTransactionSnapshot());
+	Relation	indexRelation = table_open(IndexRelationId, AccessShareLock);
+	Relation	relation = table_open(RelationRelationId, AccessShareLock);
+	Relation	relam = table_open(AccessMethodRelationId, AccessShareLock);
 
 	/*
 	 * Fetch the pg_index tuple by the Oid of the index
 	 */
-	ht_idx = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexrelid));
+	ScanKeyInit(&scankey[0], Anum_pg_index_indexrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(indexrelid));
+
+	scandesc = systable_beginscan(indexRelation,
+								  IndexRelidIndexId,
+								  true, snapshot, 1, scankey);
+
+	ht_idx = systable_getnext(scandesc);
+
 	if (!HeapTupleIsValid(ht_idx))
 	{
 		if (missing_ok)
+		{
+			systable_endscan(scandesc);
+			table_close(indexRelation, AccessShareLock);
+			table_close(relation, AccessShareLock);
+			table_close(relam, AccessShareLock);
+			UnregisterSnapshot(snapshot);
 			return NULL;
+		}
 		elog(ERROR, "cache lookup failed for index %u", indexrelid);
 	}
+
 	idxrec = (Form_pg_index) GETSTRUCT(ht_idx);
 
 	indrelid = idxrec->indrelid;
@@ -1299,7 +1333,15 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 	/*
 	 * Fetch the pg_class tuple of the index relation
 	 */
-	ht_idxrel = SearchSysCache1(RELOID, ObjectIdGetDatum(indexrelid));
+	ScanKeyInit(&scankeyRel[0],
+				Anum_pg_class_oid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(indexrelid));
+
+	scandescRel = systable_beginscan(relation, ClassOidIndexId,
+									 true, snapshot, 1, scankeyRel);
+
+	ht_idxrel = systable_getnext(scandescRel);
 	if (!HeapTupleIsValid(ht_idxrel))
 		elog(ERROR, "cache lookup failed for relation %u", indexrelid);
 	idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
@@ -1307,7 +1349,15 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 	/*
 	 * Fetch the pg_am tuple of the index' access method
 	 */
-	ht_am = SearchSysCache1(AMOID, ObjectIdGetDatum(idxrelrec->relam));
+	ScanKeyInit(&scankeyAm[0],
+				Anum_pg_am_oid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(idxrelrec->relam));
+
+	scandescAm = systable_beginscan(relam, AmOidIndexId,
+									true, snapshot, 1, scankeyAm);
+
+	ht_am = systable_getnext(scandescAm);
 	if (!HeapTupleIsValid(ht_am))
 		elog(ERROR, "cache lookup failed for access method %u",
 			 idxrelrec->relam);
@@ -1432,7 +1482,7 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 		{
 			int16		opt = indoption->values[keyno];
 			Oid			indcoll = indcollation->values[keyno];
-			Datum		attoptions = get_attoptions(indexrelid, keyno + 1);
+			Datum		attoptions = get_attoptions_using_snapshot(indexrelid, keyno + 1, snapshot);
 			bool		has_options = attoptions != (Datum) 0;
 
 			/* Add collation, if not default for column */
@@ -1488,7 +1538,7 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 		/*
 		 * If it has options, append "WITH (options)"
 		 */
-		str = flatten_reloptions(indexrelid);
+		str = flatten_reloptions_using_snapshot(indexrelid, snapshot);
 		if (str)
 		{
 			appendStringInfo(&buf, " WITH (%s)", str);
@@ -1539,9 +1589,13 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 	}
 
 	/* Clean up */
-	ReleaseSysCache(ht_idx);
-	ReleaseSysCache(ht_idxrel);
-	ReleaseSysCache(ht_am);
+	systable_endscan(scandesc);
+	systable_endscan(scandescRel);
+	systable_endscan(scandescAm);
+	table_close(indexRelation, AccessShareLock);
+	table_close(relation, AccessShareLock);
+	table_close(relam, AccessShareLock);
+	UnregisterSnapshot(snapshot);
 
 	return buf.data;
 }
@@ -12606,3 +12660,101 @@ get_range_partbound_string(List *bound_datums)
 
 	return buf->data;
 }
+
+/*
+ * get_attoptions_using_snapshot
+ *
+ *		Given the relation id and the attribute number,
+ *		return the attribute options text[] datum, if any.
+ *		It uses snapshot provided by caller to do the scan.
+ */
+static Datum
+get_attoptions_using_snapshot(Oid relid, int16 attnum, Snapshot snapshot)
+{
+	HeapTuple	tuple;
+	Datum		attopts;
+	Datum		result;
+	bool		isnull;
+	SysScanDesc scandesc;
+	ScanKeyData scankey[2];
+	Relation	relAtt = table_open(AttributeRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey[0],
+				Anum_pg_attribute_attrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+
+	ScanKeyInit(&scankey[1],
+				Anum_pg_attribute_attnum,
+				BTEqualStrategyNumber, F_INT2EQ,
+				Int16GetDatum(attnum));
+
+	scandesc = systable_beginscan(relAtt, AttributeRelidNumIndexId,
+								  true, snapshot, 2, scankey);
+
+	tuple = systable_getnext(scandesc);
+	if (!HeapTupleIsValid(tuple))
+	{
+		table_close(relAtt, AccessShareLock);
+		elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+			 attnum, relid);
+	}
+
+	attopts = SysCacheGetAttr(ATTNAME, tuple, Anum_pg_attribute_attoptions,
+							  &isnull);
+
+	if (isnull)
+		result = (Datum) 0;
+	else
+		result = datumCopy(attopts, false, -1); /* text[] */
+
+	table_close(relAtt, AccessShareLock);
+	systable_endscan(scandesc);
+
+	return result;
+}
+
+/*
+ * Generate a C string representing a relation's reloptions, or NULL if none.
+ * It uses snapshot provided by caller to do the scan.
+ */
+static char *
+flatten_reloptions_using_snapshot(Oid relid, Snapshot snapshot)
+{
+	char	   *result = NULL;
+	HeapTuple	tuple;
+	Datum		reloptions;
+	bool		isnull;
+	SysScanDesc scandesc;
+	ScanKeyData scankey[1];
+	Relation	relation = table_open(RelationRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey[0],
+				Anum_pg_class_oid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+
+	scandesc = systable_beginscan(relation, ClassOidIndexId,
+								  true, snapshot, 1, scankey);
+
+	tuple = systable_getnext(scandesc);
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for relation %u", relid);
+
+	reloptions = SysCacheGetAttr(RELOID, tuple,
+								 Anum_pg_class_reloptions, &isnull);
+	if (!isnull)
+	{
+		StringInfoData buf;
+
+		initStringInfo(&buf);
+		get_reloptions(&buf, reloptions);
+
+		result = buf.data;
+	}
+
+	table_close(relation, AccessShareLock);
+	systable_endscan(scandesc);
+
+	return result;
+}
-- 
2.34.1

