From 64258696f1e648cf5f7e05803c849c3f85be5c3a Mon Sep 17 00:00:00 2001
From: "Imseih (AWS)" <simseih@88665a22795f.ant.amazon.com>
Date: Tue, 7 Feb 2023 19:39:42 -0600
Subject: [PATCH 1/1] Report index vacuum progress.

Add 2 new columns to pg_stat_progress_vacuum.
The columns are ndexes_total as the total indexes to be vacuumed or cleaned and
indexes_processed as the number of indexes vacuumed or cleaned up so far.

Author: Sami Imseih, based on suggestions by Nathan Bossart, Peter Geoghegan and Masahiko Sawada
Reviewed by: Nathan Bossart, Masahiko Sawada
Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com
---
 doc/src/sgml/monitoring.sgml          | 31 +++++++++++
 src/backend/access/heap/vacuumlazy.c  | 80 +++++++++++++++++++++++----
 src/backend/access/transam/parallel.c | 16 ++++++
 src/backend/catalog/system_views.sql  |  3 +-
 src/backend/commands/vacuumparallel.c | 55 +++++++++++++++++-
 src/include/access/parallel.h         |  5 ++
 src/include/commands/progress.h       |  2 +
 src/include/commands/vacuum.h         |  1 +
 src/test/regress/expected/rules.out   |  4 +-
 9 files changed, 184 insertions(+), 13 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 1756f1a4b6..2796d92f99 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1779,6 +1779,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry><literal>ParallelFinish</literal></entry>
       <entry>Waiting for parallel workers to finish computing.</entry>
      </row>
+     <row>
+      <entry><literal>ParallelVacuumFinish</literal></entry>
+      <entry>Waiting for parallel vacuum workers to finish index vacuum.</entry>
+     </row>
      <row>
       <entry><literal>ProcArrayGroupUpdate</literal></entry>
       <entry>Waiting for the group leader to clear the transaction ID at
@@ -6883,6 +6887,33 @@ FROM pg_stat_get_backend_idset() AS backendid;
        Number of dead tuples collected since the last index vacuum cycle.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_total</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of indexes that will be vacuumed or cleaned up. This value will be
+       <literal>0</literal> if the phase is not <literal>vacuuming indexes</literal>
+       or <literal>cleaning up indexes</literal>, <literal>INDEX_CLEANUP</literal>
+       is set to <literal>OFF</literal>, index vacuum is skipped due to very
+       few dead tuples in the table, or vacuum failsafe is triggered.
+       See <xref linkend="guc-vacuum-failsafe-age"/>
+       for more on vacuum failsafe.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_completed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of indexes vacuumed in the current vacuum cycle when the
+       phase is <literal>vacuuming indexes</literal>, or the number
+       of indexes cleaned up during the <literal>cleaning up indexes</literal>
+       phase.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 8f14cf85f3..28bac92591 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2316,6 +2316,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 {
 	bool		allindexes = true;
 	double		old_live_tuples = vacrel->rel->rd_rel->reltuples;
+	const int progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEX_TOTAL
+	};
+	const int progress_end_index[] = {
+		PROGRESS_VACUUM_INDEX_TOTAL,
+		PROGRESS_VACUUM_INDEX_COMPLETED,
+		PROGRESS_VACUUM_NUM_INDEX_VACUUMS
+	};
+	int64       progress_start_val[2];
+	int64       progress_end_val[3];
 
 	Assert(vacrel->nindexes > 0);
 	Assert(vacrel->do_index_vacuuming);
@@ -2328,9 +2339,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 		return false;
 	}
 
-	/* Report that we are now vacuuming indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+	/*
+	 * Report that we are now vacuuming indexes
+	 * and the number of indexes to vacuum.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_INDEX;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2343,6 +2358,10 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 														  old_live_tuples,
 														  vacrel);
 
+			/* Done vacuuming an index -- increment the indexes completed */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_COMPLETED,
+										 idx + 1);
+
 			if (lazy_check_wraparound_failsafe(vacrel))
 			{
 				/* Wraparound emergency -- end current index scan */
@@ -2377,14 +2396,18 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	Assert(allindexes || vacrel->failsafe_active);
 
 	/*
-	 * Increase and report the number of index scans.
+	 * Reset and report the total indexes to vacuum and the number of
+	 * indexes vacuumed.
+	 * Also, increase and report the number of index scans completed.
 	 *
 	 * We deliberately include the case where we started a round of bulk
 	 * deletes that we weren't able to finish due to the failsafe triggering.
 	 */
 	vacrel->num_index_scans++;
-	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
-								 vacrel->num_index_scans);
+	progress_end_val[0] = 0;
+	progress_end_val[1] = 0;
+	progress_end_val[2] = vacrel->num_index_scans;
+	pgstat_progress_update_multi_param(3, progress_end_index, progress_end_val);
 
 	return allindexes;
 }
@@ -2621,12 +2644,23 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 
 	if (unlikely(vacuum_xid_failsafe_check(&vacrel->cutoffs)))
 	{
+		const int   progress_index[] = {
+			PROGRESS_VACUUM_INDEX_TOTAL,
+			PROGRESS_VACUUM_INDEX_COMPLETED
+		};
+		int64       progress_val[2] = {0, 0};
+
 		vacrel->failsafe_active = true;
 
-		/* Disable index vacuuming, index cleanup, and heap rel truncation */
+		/*
+		 * Disable index vacuuming, index cleanup, and heap rel truncation.
+		 *
+		 * Also, report that we are no longer tracking index vacuum/cleanup.
+		 */
 		vacrel->do_index_vacuuming = false;
 		vacrel->do_index_cleanup = false;
 		vacrel->do_rel_truncate = false;
+		pgstat_progress_update_multi_param(2, progress_index, progress_val);
 
 		ereport(WARNING,
 				(errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans",
@@ -2654,13 +2688,27 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
 	double		reltuples = vacrel->new_rel_tuples;
 	bool		estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+	const int progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEX_TOTAL
+	};
+	const int progress_end_index[] = {
+		PROGRESS_VACUUM_INDEX_TOTAL,
+		PROGRESS_VACUUM_INDEX_COMPLETED
+	};
+	int64       progress_start_val[2];
+	int64       progress_end_val[2];
 
 	Assert(vacrel->do_index_cleanup);
 	Assert(vacrel->nindexes > 0);
 
-	/* Report that we are now cleaning up indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
+	/*
+	 * Report that we are now cleaning up indexes
+	 * and the number of indexes to cleanup.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_INDEX_CLEANUP;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2672,6 +2720,10 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 			vacrel->indstats[idx] =
 				lazy_cleanup_one_index(indrel, istat, reltuples,
 									   estimated_count, vacrel);
+
+			/* Done cleaning an index -- increment the indexes completed */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_COMPLETED,
+										 idx + 1);
 		}
 	}
 	else
@@ -2681,6 +2733,14 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 											vacrel->num_index_scans,
 											estimated_count);
 	}
+
+	/*
+	 * Reset and report the total number of indexes to cleanup
+	 * and the number of indexes cleaned.
+	 */
+	progress_end_val[0] = 0;
+	progress_end_val[1] = 0;
+	pgstat_progress_update_multi_param(2, progress_end_index, progress_end_val);
 }
 
 /*
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 9e3ec0d5d8..a321f76999 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -185,6 +185,8 @@ CreateParallelContext(const char *library_name, const char *function_name,
 	pcxt->library_name = pstrdup(library_name);
 	pcxt->function_name = pstrdup(function_name);
 	pcxt->error_context_stack = error_context_stack;
+	pcxt->parallel_progress_callback = NULL;
+	pcxt->parallel_progress_callback_arg = NULL;
 	shm_toc_initialize_estimator(&pcxt->estimator);
 	dlist_push_head(&pcxt_list, &pcxt->node);
 
@@ -1199,6 +1201,20 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 				break;
 			}
 
+		case 'P':				/* Parallel progress reporting */
+			{
+				/*
+				 * A Leader process that receives this message
+				 * must be ready to update progress.
+				 */
+				Assert(pcxt->parallel_progress_callback);
+
+				/* Report progress */
+				pcxt->parallel_progress_callback(pcxt->parallel_progress_callback_arg);
+
+				break;
+			}
+
 		case 'X':				/* Terminate, indicating clean exit */
 			{
 				shm_mq_detach(pcxt->worker[i].error_mqh);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 8608e3fa5b..799587dcd7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1165,7 +1165,8 @@ CREATE VIEW pg_stat_progress_vacuum AS
                       END AS phase,
         S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
         S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
-        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples
+        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples,
+        S.param8 AS indexes_total, S.param9 AS indexes_completed
     FROM pg_stat_get_progress_info('VACUUM') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index bcd40c80a1..9e5a300ba4 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -30,7 +30,9 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/index.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
+#include "libpq/libpq.h"
 #include "optimizer/paths.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -103,6 +105,17 @@ typedef struct PVShared
 
 	/* Counter for vacuuming and cleanup */
 	pg_atomic_uint32 idx;
+
+	/*
+	 * Counter for vacuuming and cleanup progress reporting.
+	 * This value is used to report index vacuum/cleanup progress
+	 * in parallel_vacuum_progress_report. We keep this
+	 * counter to avoid having to loop through
+	 * ParallelVacuumState->indstats to determine the number
+	 * of indexes completed.
+	 */
+	pg_atomic_uint32 nindexes_completed;
+
 } PVShared;
 
 /* Status used during parallel index vacuum or cleanup */
@@ -272,6 +285,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 								 parallel_workers);
 	Assert(pcxt->nworkers > 0);
 	pvs->pcxt = pcxt;
+	pcxt->parallel_progress_callback = parallel_vacuum_update_progress;
+	pcxt->parallel_progress_callback_arg = pvs;
 
 	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
 	est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
@@ -364,6 +379,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	pg_atomic_init_u32(&(shared->cost_balance), 0);
 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
 	pg_atomic_init_u32(&(shared->idx), 0);
+	pg_atomic_init_u32(&(shared->nindexes_completed), 0);
 
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	pvs->shared = shared;
@@ -618,8 +634,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 													vacuum));
 	}
 
-	/* Reset the parallel index processing counter */
+	/* Reset the parallel index processing and progress counters */
 	pg_atomic_write_u32(&(pvs->shared->idx), 0);
+	pg_atomic_write_u32(&(pvs->shared->nindexes_completed), 0);
 
 	/* Setup the shared cost-based vacuum delay and launch workers */
 	if (nworkers > 0)
@@ -888,6 +905,22 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
 	pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
 	pfree(pvs->indname);
 	pvs->indname = NULL;
+
+	/*
+	 * Update index vacuum progress.
+	 *
+	 * When a parallel worker completes an
+	 * index vacuum, it sends a protocol message
+	 * to notify the leader. The leader then
+	 * updates the progress. See HandleParallelMessage().
+	 *
+	 * When a leader performs the index vacuum,
+	 * it can update the progress directly.
+	 */
+	if (IsParallelWorker())
+		pq_putmessage('P', NULL, 0);
+	else
+		parallel_vacuum_update_progress(pvs);
 }
 
 /*
@@ -1072,3 +1105,23 @@ parallel_vacuum_error_callback(void *arg)
 			return;
 	}
 }
+
+/*
+ * Read pvs->shared->nindexes_completed and report the number of indexes
+ * vacuumed so far.
+ *
+ * Note: This function should be called by the leader process only,
+ * and it's up to the caller to ensure this.
+ */
+void
+parallel_vacuum_update_progress(void *arg)
+{
+	ParallelVacuumState *pvs = (ParallelVacuumState *)arg;
+
+	Assert(!IsParallelWorker());
+	Assert(pvs->pcxt->parallel_progress_callback_arg);
+
+	if (pvs)
+		pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_COMPLETED,
+									 pg_atomic_add_fetch_u32(&(pvs->shared->nindexes_completed), 1));
+}
\ No newline at end of file
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 061f8a4c4c..7ddc71dae2 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -20,6 +20,9 @@
 #include "storage/shm_mq.h"
 #include "storage/shm_toc.h"
 
+/* progress callback definition */
+typedef void (*ParallelProgressCallback) (void *parallel_progress_callback_state);
+
 typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
 
 typedef struct ParallelWorkerInfo
@@ -46,6 +49,8 @@ typedef struct ParallelContext
 	ParallelWorkerInfo *worker;
 	int			nknown_attached_workers;
 	bool	   *known_attached_workers;
+	ParallelProgressCallback parallel_progress_callback;
+	void		*parallel_progress_callback_arg;
 } ParallelContext;
 
 typedef struct ParallelWorkerContext
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index e5add41352..6b8b609a4f 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -25,6 +25,8 @@
 #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS		4
 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES			5
 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES			6
+#define PROGRESS_VACUUM_INDEX_TOTAL             7
+#define PROGRESS_VACUUM_INDEX_COMPLETED         8
 
 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */
 #define PROGRESS_VACUUM_PHASE_SCAN_HEAP			1
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 689dbb7702..7b13069d33 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -370,5 +370,6 @@ extern bool std_typanalyze(VacAttrStats *stats);
 extern double anl_random_fract(void);
 extern double anl_init_selection_state(int n);
 extern double anl_get_next_S(double t, int n, double *stateptr);
+extern void parallel_vacuum_update_progress(void *arg);
 
 #endif							/* VACUUM_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e7a2f5856a..a20978d335 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2021,7 +2021,9 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param4 AS heap_blks_vacuumed,
     s.param5 AS index_vacuum_count,
     s.param6 AS max_dead_tuples,
-    s.param7 AS num_dead_tuples
+    s.param7 AS num_dead_tuples,
+    s.param8 AS indexes_total,
+    s.param9 AS indexes_completed
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_recovery_prefetch| SELECT stats_reset,
-- 
2.37.1 (Apple Git-137.1)

