From dcdd85209f6fc9153f300ba659d92d27a03b69b8 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Sat, 9 Jan 2021 06:53:09 +0530
Subject: [PATCH v4 1/2] Rearrange Refresh Mat View Code

Currently, the function ExecRefreshMatView in matview.c is having
many lines of code which is not at all good from readability and
maintainability perspectives. This patch adds few functions and
moves the code from ExecRefreshMatView to them making the code
look better.
---
 src/backend/commands/matview.c | 452 ++++++++++++++++++++-------------
 1 file changed, 272 insertions(+), 180 deletions(-)

diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index c5c25ce11d..a8d65bba50 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -64,7 +64,7 @@ static void transientrel_startup(DestReceiver *self, int operation, TupleDesc ty
 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void transientrel_shutdown(DestReceiver *self);
 static void transientrel_destroy(DestReceiver *self);
-static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
+static uint64 refresh_matview_datafill(Oid OIDNewHeap, Query *query,
 									   const char *queryString);
 static char *make_temptable_name_n(char *tempname, int n);
 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
@@ -73,6 +73,16 @@ static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersist
 static bool is_usable_unique_index(Relation indexRel);
 static void OpenMatViewIncrementalMaintenance(void);
 static void CloseMatViewIncrementalMaintenance(void);
+static Query *get_matview_query(RefreshMatViewStmt *stmt, Relation *rel,
+								Oid *objectId);
+static Query *rewrite_refresh_matview_query(Query *dataQuery);
+static Oid	get_new_heap_oid(RefreshMatViewStmt *stmt, Relation matviewRel,
+							 Oid matviewOid, char *relpersistence);
+static void match_matview_with_new_data(RefreshMatViewStmt *stmt,
+										Relation matviewRel, Oid matviewOid,
+										Oid OIDNewHeap, Oid relowner,
+										int save_sec_context,
+										char relpersistence, uint64 processed);
 
 /*
  * SetMatViewPopulatedState
@@ -140,127 +150,18 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 {
 	Oid			matviewOid;
 	Relation	matviewRel;
-	RewriteRule *rule;
-	List	   *actions;
 	Query	   *dataQuery;
-	Oid			tableSpace;
-	Oid			relowner;
 	Oid			OIDNewHeap;
-	DestReceiver *dest;
 	uint64		processed = 0;
-	bool		concurrent;
-	LOCKMODE	lockmode;
+	Oid			relowner;
 	char		relpersistence;
 	Oid			save_userid;
 	int			save_sec_context;
 	int			save_nestlevel;
 	ObjectAddress address;
 
-	/* Determine strength of lock needed. */
-	concurrent = stmt->concurrent;
-	lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
-
-	/*
-	 * Get a lock until end of transaction.
-	 */
-	matviewOid = RangeVarGetRelidExtended(stmt->relation,
-										  lockmode, 0,
-										  RangeVarCallbackOwnsTable, NULL);
-	matviewRel = table_open(matviewOid, NoLock);
-
-	/* Make sure it is a materialized view. */
-	if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("\"%s\" is not a materialized view",
-						RelationGetRelationName(matviewRel))));
-
-	/* Check that CONCURRENTLY is not specified if not populated. */
-	if (concurrent && !RelationIsPopulated(matviewRel))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
-
-	/* Check that conflicting options have not been specified. */
-	if (concurrent && stmt->skipData)
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
-
-	/*
-	 * Check that everything is correct for a refresh. Problems at this point
-	 * are internal errors, so elog is sufficient.
-	 */
-	if (matviewRel->rd_rel->relhasrules == false ||
-		matviewRel->rd_rules->numLocks < 1)
-		elog(ERROR,
-			 "materialized view \"%s\" is missing rewrite information",
-			 RelationGetRelationName(matviewRel));
-
-	if (matviewRel->rd_rules->numLocks > 1)
-		elog(ERROR,
-			 "materialized view \"%s\" has too many rules",
-			 RelationGetRelationName(matviewRel));
-
-	rule = matviewRel->rd_rules->rules[0];
-	if (rule->event != CMD_SELECT || !(rule->isInstead))
-		elog(ERROR,
-			 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
-			 RelationGetRelationName(matviewRel));
-
-	actions = rule->actions;
-	if (list_length(actions) != 1)
-		elog(ERROR,
-			 "the rule for materialized view \"%s\" is not a single action",
-			 RelationGetRelationName(matviewRel));
-
-	/*
-	 * Check that there is a unique index with no WHERE clause on one or more
-	 * columns of the materialized view if CONCURRENTLY is specified.
-	 */
-	if (concurrent)
-	{
-		List	   *indexoidlist = RelationGetIndexList(matviewRel);
-		ListCell   *indexoidscan;
-		bool		hasUniqueIndex = false;
-
-		foreach(indexoidscan, indexoidlist)
-		{
-			Oid			indexoid = lfirst_oid(indexoidscan);
-			Relation	indexRel;
-
-			indexRel = index_open(indexoid, AccessShareLock);
-			hasUniqueIndex = is_usable_unique_index(indexRel);
-			index_close(indexRel, AccessShareLock);
-			if (hasUniqueIndex)
-				break;
-		}
-
-		list_free(indexoidlist);
-
-		if (!hasUniqueIndex)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("cannot refresh materialized view \"%s\" concurrently",
-							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
-													   RelationGetRelationName(matviewRel))),
-					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
-	}
-
-	/*
-	 * The stored query was rewritten at the time of the MV definition, but
-	 * has not been scribbled on by the planner.
-	 */
-	dataQuery = linitial_node(Query, actions);
-
-	/*
-	 * Check for active uses of the relation in the current transaction, such
-	 * as open scans.
-	 *
-	 * NB: We count on this to protect us against problems with refreshing the
-	 * data using TABLE_INSERT_FROZEN.
-	 */
-	CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
+	/* Get the data generating query. */
+	dataQuery = get_matview_query(stmt, &matviewRel, &matviewOid);
 
 	/*
 	 * Tentatively mark the matview as populated or not (this will roll back
@@ -281,27 +182,8 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
 	save_nestlevel = NewGUCNestLevel();
 
-	/* Concurrent refresh builds new data in temp tablespace, and does diff. */
-	if (concurrent)
-	{
-		tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
-		relpersistence = RELPERSISTENCE_TEMP;
-	}
-	else
-	{
-		tableSpace = matviewRel->rd_rel->reltablespace;
-		relpersistence = matviewRel->rd_rel->relpersistence;
-	}
-
-	/*
-	 * Create the transient table that will receive the regenerated data. Lock
-	 * it against access by any other process until commit (by which time it
-	 * will be gone).
-	 */
-	OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
-							   ExclusiveLock);
-	LockRelationOid(OIDNewHeap, AccessExclusiveLock);
-	dest = CreateTransientRelDestReceiver(OIDNewHeap);
+	OIDNewHeap = get_new_heap_oid(stmt, matviewRel, matviewOid,
+								  &relpersistence);
 
 	/*
 	 * Now lock down security-restricted operations.
@@ -311,40 +193,16 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 
 	/* Generate the data, if wanted. */
 	if (!stmt->skipData)
-		processed = refresh_matview_datafill(dest, dataQuery, queryString);
-
-	/* Make the matview match the newly generated data. */
-	if (concurrent)
 	{
-		int			old_depth = matview_maintenance_depth;
+		dataQuery = rewrite_refresh_matview_query(dataQuery);
 
-		PG_TRY();
-		{
-			refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
-								   save_sec_context);
-		}
-		PG_CATCH();
-		{
-			matview_maintenance_depth = old_depth;
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
-		Assert(matview_maintenance_depth == old_depth);
+		processed = refresh_matview_datafill(OIDNewHeap, dataQuery,
+											 queryString);
 	}
-	else
-	{
-		refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
 
-		/*
-		 * Inform stats collector about our activity: basically, we truncated
-		 * the matview and inserted some new data.  (The concurrent code path
-		 * above doesn't need to worry about this because the inserts and
-		 * deletes it issues get counted by lower-level code.)
-		 */
-		pgstat_count_truncate(matviewRel);
-		if (!stmt->skipData)
-			pgstat_count_heap_insert(matviewRel, processed);
-	}
+	match_matview_with_new_data(stmt, matviewRel, matviewOid, OIDNewHeap,
+								relowner, save_sec_context, relpersistence,
+								processed);
 
 	table_close(matviewRel, NoLock);
 
@@ -373,30 +231,18 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
 /*
  * refresh_matview_datafill
  *
- * Execute the given query, sending result rows to "dest" (which will
- * insert them into the target matview).
+ * Create dest receiver and execute the given query, sending result rows to the
+ * dest receiver which will insert them into the target materialized view.
  *
  * Returns number of rows inserted.
  */
 static uint64
-refresh_matview_datafill(DestReceiver *dest, Query *query,
-						 const char *queryString)
+refresh_matview_datafill(Oid OIDNewHeap, Query *query, const char *queryString)
 {
-	List	   *rewritten;
 	PlannedStmt *plan;
 	QueryDesc  *queryDesc;
-	Query	   *copied_query;
 	uint64		processed;
-
-	/* Lock and rewrite, using a copy to preserve the original query. */
-	copied_query = copyObject(query);
-	AcquireRewriteLocks(copied_query, true, false);
-	rewritten = QueryRewrite(copied_query);
-
-	/* SELECT should never rewrite to more or less than one SELECT query */
-	if (list_length(rewritten) != 1)
-		elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
-	query = (Query *) linitial(rewritten);
+	DestReceiver *dest;
 
 	/* Check for user-requested abort. */
 	CHECK_FOR_INTERRUPTS();
@@ -413,6 +259,8 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	PushCopiedSnapshot(GetActiveSnapshot());
 	UpdateActiveSnapshotCommandId();
 
+	dest = CreateTransientRelDestReceiver(OIDNewHeap);
+
 	/* Create a QueryDesc, redirecting output to our tuple receiver */
 	queryDesc = CreateQueryDesc(plan, queryString,
 								GetActiveSnapshot(), InvalidSnapshot,
@@ -933,3 +781,247 @@ CloseMatViewIncrementalMaintenance(void)
 	matview_maintenance_depth--;
 	Assert(matview_maintenance_depth >= 0);
 }
+
+/*
+ * get_matview_query
+ *
+ * Open the refresh materialized view relation, perform sanity checks and also
+ * get the associated data generating query from it.
+ *
+ * Note that the refresh materialized view relation is opened here, it has to
+ * be closed in the caller.
+ */
+static Query *
+get_matview_query(RefreshMatViewStmt *stmt, Relation *rel, Oid *objectId)
+{
+	Oid			matviewOid;
+	Relation	matviewRel;
+	RewriteRule *rule;
+	List	   *actions;
+	Query	   *dataQuery;
+	LOCKMODE	lockmode;
+
+	/* Determine strength of lock needed. */
+	lockmode = stmt->concurrent ? ExclusiveLock : AccessExclusiveLock;
+
+	/* Get a lock until end of transaction. */
+	matviewOid = RangeVarGetRelidExtended(stmt->relation, lockmode, 0,
+										  RangeVarCallbackOwnsTable, NULL);
+	matviewRel = table_open(matviewOid, NoLock);
+
+	/* Make sure it is a materialized view. */
+	if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("\"%s\" is not a materialized view",
+						RelationGetRelationName(matviewRel))));
+
+	/* Check that CONCURRENTLY is not specified if not populated. */
+	if (stmt->concurrent && !RelationIsPopulated(matviewRel))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
+
+	/* Check that conflicting options have not been specified. */
+	if (stmt->concurrent && stmt->skipData)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
+
+	/*
+	 * Check that everything is correct for a refresh. Problems at this point
+	 * are internal errors, so elog is sufficient.
+	 */
+	if (matviewRel->rd_rel->relhasrules == false ||
+		matviewRel->rd_rules->numLocks < 1)
+		elog(ERROR,
+			 "materialized view \"%s\" is missing rewrite information",
+			 RelationGetRelationName(matviewRel));
+
+	if (matviewRel->rd_rules->numLocks > 1)
+		elog(ERROR,
+			 "materialized view \"%s\" has too many rules",
+			 RelationGetRelationName(matviewRel));
+
+	rule = matviewRel->rd_rules->rules[0];
+	if (rule->event != CMD_SELECT || !(rule->isInstead))
+		elog(ERROR,
+			 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
+			 RelationGetRelationName(matviewRel));
+
+	actions = rule->actions;
+	if (list_length(actions) != 1)
+		elog(ERROR,
+			 "the rule for materialized view \"%s\" is not a single action",
+			 RelationGetRelationName(matviewRel));
+
+	/*
+	 * Check that there is a unique index with no WHERE clause on one or more
+	 * columns of the materialized view if CONCURRENTLY is specified.
+	 */
+	if (stmt->concurrent)
+	{
+		List	   *indexoidlist = RelationGetIndexList(matviewRel);
+		ListCell   *indexoidscan;
+		bool		hasUniqueIndex = false;
+
+		foreach(indexoidscan, indexoidlist)
+		{
+			Oid			indexoid = lfirst_oid(indexoidscan);
+			Relation	indexRel;
+
+			indexRel = index_open(indexoid, AccessShareLock);
+			hasUniqueIndex = is_usable_unique_index(indexRel);
+			index_close(indexRel, AccessShareLock);
+			if (hasUniqueIndex)
+				break;
+		}
+
+		list_free(indexoidlist);
+
+		if (!hasUniqueIndex)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot refresh materialized view \"%s\" concurrently",
+							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+													   RelationGetRelationName(matviewRel))),
+					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
+	}
+
+	/*
+	 * The stored query was rewritten at the time of the MV definition, but
+	 * has not been scribbled on by the planner.
+	 */
+	dataQuery = linitial_node(Query, actions);
+
+	/*
+	 * Check for active uses of the relation in the current transaction, such
+	 * as open scans.
+	 *
+	 * NB: We count on this to protect us against problems with refreshing the
+	 * data using TABLE_INSERT_FROZEN.
+	 */
+	CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
+
+	*rel = matviewRel;
+	*objectId = matviewOid;
+
+	return dataQuery;
+}
+
+/*
+ * get_new_heap_oid
+ *
+ * Create a new heap and return its oid to which the refresh materialized view
+ * data is inserted into.
+ */
+static Oid
+get_new_heap_oid(RefreshMatViewStmt *stmt, Relation matviewRel, Oid matviewOid,
+				 char *relpersistence)
+{
+	Oid			OIDNewHeap;
+	bool		concurrent;
+	Oid			tableSpace;
+
+	concurrent = stmt->concurrent;
+
+	/* Concurrent refresh builds new data in temp tablespace, and does diff. */
+	if (concurrent)
+	{
+		tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
+		*relpersistence = RELPERSISTENCE_TEMP;
+	}
+	else
+	{
+		tableSpace = matviewRel->rd_rel->reltablespace;
+		*relpersistence = matviewRel->rd_rel->relpersistence;
+	}
+
+	/*
+	 * Create the transient table that will receive the regenerated data. Lock
+	 * it against access by any other process until commit (by which time it
+	 * will be gone).
+	 */
+	OIDNewHeap = make_new_heap(matviewOid, tableSpace, *relpersistence,
+							   ExclusiveLock);
+	LockRelationOid(OIDNewHeap, AccessExclusiveLock);
+
+	return OIDNewHeap;
+}
+
+/*
+ * match_matview_with_new_data
+ *
+ * Arrange the materialized view newly generated data to match the existing
+ * data i.e merge in case of CONCURRENTLY otherwise perform heap swap and
+ * truncate the materialized view.
+ */
+static void
+match_matview_with_new_data(RefreshMatViewStmt *stmt, Relation matviewRel,
+							Oid matviewOid, Oid OIDNewHeap, Oid relowner,
+							int save_sec_context, char relpersistence,
+							uint64 processed)
+{
+	/* Make the materialized view match the newly generated data. */
+	if (stmt->concurrent)
+	{
+		int			old_depth = matview_maintenance_depth;
+
+		PG_TRY();
+		{
+			refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
+								   save_sec_context);
+		}
+		PG_CATCH();
+		{
+			matview_maintenance_depth = old_depth;
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+		Assert(matview_maintenance_depth == old_depth);
+	}
+	else
+	{
+		refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
+
+		/*
+		 * Inform stats collector about our activity: basically, we truncated
+		 * the materialized view and inserted some new data.  (The concurrent
+		 * code path above doesn't need to worry about this because the
+		 * inserts and deletes it issues get counted by lower-level code.)
+		 */
+		pgstat_count_truncate(matviewRel);
+		if (!stmt->skipData)
+			pgstat_count_heap_insert(matviewRel, processed);
+	}
+}
+
+/*
+ * rewrite_refresh_matview_query
+ *
+ * Rewrite the refresh materialized view data generating query.
+ *
+ * Work on the copied query to preserve the original query. Because the
+ * rewriter and planner tend to scribble on the input, we make a preliminary
+ * copy of the source querytree.  This prevents problems in the case that
+ * REFRESH MATERIALIZED VIEW is in a portal or plpgsql function and is executed
+ * repeatedly. (See also the same hack in EXPLAIN and PREPARE.)
+ */
+static Query *
+rewrite_refresh_matview_query(Query *dataQuery)
+{
+	List	   *rewritten;
+	Query	   *copied_query;
+
+	/* Lock and rewrite, using a copy to preserve the original query. */
+	copied_query = copyObject(dataQuery);
+	AcquireRewriteLocks(copied_query, true, false);
+	rewritten = QueryRewrite(copied_query);
+
+	/* SELECT should never rewrite to more or less than one SELECT query */
+	if (list_length(rewritten) != 1)
+		elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
+	dataQuery = (Query *) linitial(rewritten);
+
+	return dataQuery;
+}
-- 
2.25.1

