commit 8bad9c111f003eee02e504f1d8cb0efd0caf58b3
Author: Robert Haas <rhaas@postgresql.org>
Date:   Wed Jan 17 10:44:58 2018 -0500

    propagate-reindex-state-v1

diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index 5c33c40ae9..32994719e3 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -122,6 +122,9 @@ worker.  This includes:
     values are restored, this incidentally sets SessionUserId and OuterUserId
     to the correct values.  This final step restores CurrentUserId.
 
+  - State related to pending REINDEX operations, which prevents access to
+    an index that is currently being rebuilt.
+
 To prevent undetected or unprincipled deadlocks when running in parallel mode,
 this could should eventually handle heavyweight locks in some way.  This is
 not implemented yet.
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index f720896e50..0a0157a878 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -18,6 +18,7 @@
 #include "access/session.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "executor/execParallel.h"
@@ -67,6 +68,7 @@
 #define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0008)
 #define PARALLEL_KEY_ENTRYPOINT				UINT64CONST(0xFFFFFFFFFFFF0009)
 #define PARALLEL_KEY_SESSION_DSM			UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_REINDEX_STATE			UINT64CONST(0xFFFFFFFFFFFF000B)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	Size		tsnaplen = 0;
 	Size		asnaplen = 0;
 	Size		tstatelen = 0;
+	Size		reindexlen = 0;
 	Size		segsize = 0;
 	int			i;
 	FixedParallelState *fps;
@@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		tstatelen = EstimateTransactionStateSpace();
 		shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
 		shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
+		reindexlen = EstimateReindexStateSpace();
+		shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
 		/* If you add more chunks here, you probably need to add keys. */
-		shm_toc_estimate_keys(&pcxt->estimator, 7);
+		shm_toc_estimate_keys(&pcxt->estimator, 8);
 
 		/* Estimate space need for error queues. */
 		StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		char	   *tsnapspace;
 		char	   *asnapspace;
 		char	   *tstatespace;
+		char	   *reindexspace;
 		char	   *error_queue_space;
 		char	   *session_dsm_handle_space;
 		char	   *entrypointstate;
@@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		SerializeTransactionState(tstatelen, tstatespace);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
 
+		/* Serialize reindex state. */
+		reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
+		SerializeReindexState(reindexlen, reindexspace);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+
 		/* Allocate space for worker information. */
 		pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
 
@@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg)
 	char	   *tsnapspace;
 	char	   *asnapspace;
 	char	   *tstatespace;
+	char	   *reindexspace;
 	StringInfoData msgbuf;
 	char	   *session_dsm_handle_space;
 
@@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg)
 	/* Set ParallelMasterBackendId so we know how to address temp relations. */
 	ParallelMasterBackendId = fps->parallel_master_backend_id;
 
+	/* Restore reindex state. */
+	reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
+	RestoreReindexState(reindexspace);
+
 	/*
 	 * We've initialized all of our state now; nothing should change
 	 * hereafter.
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 330488b96f..155c36ecf1 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -86,6 +86,14 @@ typedef struct
 				tups_inserted;
 } v_i_state;
 
+typedef struct
+{
+	Oid			currentlyReindexedHeap;
+	Oid			currentlyReindexedIndex;
+	int			numPendingReindexedIndexes;
+	Oid			pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedReindexState;
+
 /* non-export function prototypes */
 static bool relationHasPrimaryKey(Relation rel);
 static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
@@ -3719,6 +3727,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid)
 static void
 ResetReindexProcessing(void)
 {
+	if (IsInParallelMode())
+		elog(ERROR, "cannot modify reindex state during a parallel operation");
 	currentlyReindexedHeap = InvalidOid;
 	currentlyReindexedIndex = InvalidOid;
 }
@@ -3736,6 +3746,8 @@ SetReindexPending(List *indexes)
 	/* Reindexing is not re-entrant. */
 	if (pendingReindexedIndexes)
 		elog(ERROR, "cannot reindex while reindexing");
+	if (IsInParallelMode())
+		elog(ERROR, "cannot modify reindex state during a parallel operation");
 	pendingReindexedIndexes = list_copy(indexes);
 }
 
@@ -3746,6 +3758,8 @@ SetReindexPending(List *indexes)
 static void
 RemoveReindexPending(Oid indexOid)
 {
+	if (IsInParallelMode())
+		elog(ERROR, "cannot modify reindex state during a parallel operation");
 	pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
 											  indexOid);
 }
@@ -3757,5 +3771,59 @@ RemoveReindexPending(Oid indexOid)
 static void
 ResetReindexPending(void)
 {
+	if (IsInParallelMode())
+		elog(ERROR, "cannot modify reindex state during a parallel operation");
 	pendingReindexedIndexes = NIL;
 }
+
+/*
+ * EstimateReindexStateSpace
+ *		Estimate space needed to pass reindex state to parallel workers.
+ */
+extern Size
+EstimateReindexStateSpace(void)
+{
+	return offsetof(SerializedReindexState, pendingReindexedIndexes)
+		+ mul_size(sizeof(Oid), list_length(pendingReindexedIndexes));
+}
+
+/*
+ * SerializeReindexState
+ *		Serialize reindex state for parallel workers.
+ */
+extern void
+SerializeReindexState(Size maxsize, char *start_address)
+{
+	SerializedReindexState *sistate = (SerializedReindexState *) start_address;
+	int			c = 0;
+	ListCell   *lc;
+
+	sistate->currentlyReindexedHeap = currentlyReindexedHeap;
+	sistate->currentlyReindexedIndex = currentlyReindexedIndex;
+	sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes);
+	foreach(lc, pendingReindexedIndexes)
+		sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc);
+}
+
+/*
+ * RestoreReindexState
+ *		Restore reindex state in a parallel worker.
+ */
+extern void
+RestoreReindexState(void *reindexstate)
+{
+	SerializedReindexState *sistate = (SerializedReindexState *) reindexstate;
+	int			c = 0;
+	MemoryContext	oldcontext;
+
+	currentlyReindexedHeap = sistate->currentlyReindexedHeap;
+	currentlyReindexedIndex = sistate->currentlyReindexedIndex;
+
+	Assert(pendingReindexedIndexes == NIL);
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	for (c = 0; c < sistate->numPendingReindexedIndexes; ++c)
+		pendingReindexedIndexes =
+			lappend_oid(pendingReindexedIndexes,
+						sistate->pendingReindexedIndexes[c]);
+	MemoryContextSwitchTo(oldcontext);
+}
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 12bf35567a..4790f0c735 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
 extern bool ReindexIsProcessingIndex(Oid indexOid);
 extern Oid	IndexGetRelation(Oid indexId, bool missing_ok);
 
+extern Size EstimateReindexStateSpace(void);
+extern void SerializeReindexState(Size maxsize, char *start_address);
+extern void RestoreReindexState(void *reindexstate);
+
 #endif							/* INDEX_H */
