From 8b205bd8a109892779dc804fb75d06d9a71db396 Mon Sep 17 00:00:00 2001
From: Timothy Garnett <tgarnett@panjiva.com>
Date: Tue, 23 Apr 2013 11:21:25 -0400
Subject: [PATCH 1/1] patch pg_restore to support partial parallel restore
 (parallel restore of indexes/constraints, but not data) of custom format
 dumps received vai stdin

---
 src/bin/pg_dump/pg_backup_archiver.c | 66 ++++++++++++++++++++++--------------
 src/bin/pg_dump/pg_backup_custom.c   |  5 +++
 2 files changed, 45 insertions(+), 26 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index c176b65..537b0c1 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -115,7 +115,6 @@ typedef struct _outputContext
 /* translator: this is a module name */
 static const char *modulename = gettext_noop("archiver");
 
-
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 		 const int compression, ArchiveMode mode);
 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
@@ -316,18 +315,12 @@ RestoreArchive(Archive *AHX)
 	if (parallel_mode)
 	{
 		/* We haven't got round to making this work for all archive formats */
-		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
+		if (AH->ClonePtr == NULL)
 			exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
 
 		/* Doesn't work if the archive represents dependencies as OIDs */
 		if (AH->version < K_VERS_1_8)
 			exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
-
-		/*
-		 * It's also not gonna work if we can't reopen the input file, so
-		 * let's try that immediately.
-		 */
-		(AH->ReopenPtr) (AH);
 	}
 
 	/*
@@ -3465,12 +3458,19 @@ on_exit_close_archive(Archive *AHX)
  * Main engine for parallel restore.
  *
  * Work is done in three phases.
- * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
- * just as for a standard restore.	Second we process the remaining non-ACL
- * steps in parallel worker children (threads on Windows, processes on Unix),
- * each of which connects separately to the database.  Finally we process all
- * the ACL entries in a single connection (that happens back in
- * RestoreArchive).
+ * 1) First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+ *    just as for a standard restore. If we are unable to reopen the archive
+ *    (say from STDIN) we also do SECTION_DATA tocEntries as well.
+ * 2) Second we process the remaining non-ACL steps in parallel worker children
+ *    (threads on Windows, processes on Unix), each of which connects separately
+ *    to the database.
+ * 3) Finally we process all the ACL entries in a single connection
+ *    (that happens back in RestoreArchive).
+ *
+ * As a future improvement, in the case of unable to reopen the archive, it
+ * should be possible do SECTION_POST_DATA entries in parallel with the
+ * SECTION_DATA entries as their dependencies are met.  But, that requires
+ * some re-working of the flow here.
  */
 static void
 restore_toc_entries_parallel(ArchiveHandle *AH)
@@ -3520,23 +3520,32 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 		/* NB: process-or-continue logic must be the inverse of loop below */
 		if (next_work_item->section != SECTION_PRE_DATA)
 		{
-			/* DATA and POST_DATA items are just ignored for now */
-			if (next_work_item->section == SECTION_DATA ||
-				next_work_item->section == SECTION_POST_DATA)
+			/* Detect if we can re-open the archive (needed for parallel data
+			 * restore). If not process SECTION_DATA entries as well. */
+			if (next_work_item->section == SECTION_DATA && AH->ReopenPtr == NULL)
 			{
-				skipped_some = true;
-				continue;
+				/* fall through to process toc entry */
 			}
 			else
 			{
-				/*
-				 * SECTION_NONE items, such as comments, can be processed now
-				 * if we are still in the PRE_DATA part of the archive.  Once
-				 * we've skipped any items, we have to consider whether the
-				 * comment's dependencies are satisfied, so skip it for now.
-				 */
-				if (skipped_some)
+				/* DATA and POST_DATA items are just ignored for now */
+				if (next_work_item->section == SECTION_DATA ||
+					next_work_item->section == SECTION_POST_DATA)
+				{
+					skipped_some = true;
 					continue;
+				}
+				else
+				{
+					/*
+					 * SECTION_NONE items, such as comments, can be processed now
+					 * if we are still in the PRE_DATA part of the archive.  Once
+					 * we've skipped any items, we have to consider whether the
+					 * comment's dependencies are satisfied, so skip it for now.
+					 */
+					if (skipped_some)
+						continue;
+				}
 			}
 		}
 
@@ -3595,6 +3604,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 			/* All PRE_DATA items were dealt with above */
 			continue;
 		}
+		if (next_work_item->section == SECTION_DATA && AH->ReopenPtr == NULL)
+		{
+			/* If not able to reopen we handled SECTION_DATA above */
+			continue;
+		}
 		if (next_work_item->section == SECTION_DATA ||
 			next_work_item->section == SECTION_POST_DATA)
 		{
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index f7dc5be..4c9aa44 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -184,6 +184,11 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
 		ctx->dataStart = _getFilePos(AH, ctx);
 	}
 
+	/* clear the ReopenPtr if we can't actually reopen the file (say STDIN) */
+	if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0 || !ctx->hasSeek)
+	{
+		AH->ReopenPtr = NULL;
+	}
 }
 
 /*
-- 
1.8.2.1

