From 3c40cbcf33478e99ee70be7667cc89640049f014 Mon Sep 17 00:00:00 2001
From: Timothy Garnett <tgarnett@panjiva.com>
Date: Tue, 23 Apr 2013 18:22:51 -0400
Subject: [PATCH 3/3] patch pg_restore to allow parallel restore when the
 archive file is piped in via STDIN, data is still loaded serially in parallel
 but we parallelize indexes and constraints and intermix them with the data
 loading, because data loading blocks dispatching it's possible we could do
 slightly better, but this is pretty good

---
 src/bin/pg_dump/pg_backup_archiver.c | 219 +++++++++++++++++++++++------------
 src/bin/pg_dump/pg_backup_custom.c   |   5 +
 2 files changed, 151 insertions(+), 73 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index c176b65..df6fe22 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -151,6 +151,8 @@ static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
 				  RestoreOptions *ropt, bool is_parallel);
 static void restore_toc_entries_parallel(ArchiveHandle *AH);
+static void parallel_restore_close_db_connection(ArchiveHandle *AH);
+static void parallel_restore_reestablish_db_connection(ArchiveHandle *AH);
 static thandle spawn_restore(RestoreArgs *args);
 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
 static bool work_in_progress(ParallelSlot *slots, int n_slots);
@@ -160,7 +162,7 @@ static void par_list_append(TocEntry *l, TocEntry *te);
 static void par_list_remove(TocEntry *te);
 static TocEntry *get_next_work_item(ArchiveHandle *AH,
 				   TocEntry *ready_list,
-				   ParallelSlot *slots, int n_slots);
+				   ParallelSlot *slots, int n_slots, bool pref_non_data);
 static parallel_restore_result parallel_restore(RestoreArgs *args);
 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
 			   thandle worker, int status,
@@ -171,6 +173,7 @@ static void repoint_table_dependencies(ArchiveHandle *AH);
 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
 					TocEntry *ready_list);
+static void add_dependency(TocEntry *te, DumpId refId);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
@@ -316,18 +319,12 @@ RestoreArchive(Archive *AHX)
 	if (parallel_mode)
 	{
 		/* We haven't got round to making this work for all archive formats */
-		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
+		if (AH->ClonePtr == NULL)
 			exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
 
 		/* Doesn't work if the archive represents dependencies as OIDs */
 		if (AH->version < K_VERS_1_8)
 			exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
-
-		/*
-		 * It's also not gonna work if we can't reopen the input file, so
-		 * let's try that immediately.
-		 */
-		(AH->ReopenPtr) (AH);
 	}
 
 	/*
@@ -3461,16 +3458,60 @@ on_exit_close_archive(Archive *AHX)
 	on_exit_nicely(archive_close_connection, &shutdown_info);
 }
 
+void
+parallel_restore_close_db_connection(ArchiveHandle *AH)
+{
+	DisconnectDatabase(&AH->public);
+
+	/* blow away any transient state from the old connection */
+	if (AH->currUser)
+		free(AH->currUser);
+	AH->currUser = NULL;
+	if (AH->currSchema)
+		free(AH->currSchema);
+	AH->currSchema = NULL;
+	if (AH->currTablespace)
+		free(AH->currTablespace);
+	AH->currTablespace = NULL;
+	AH->currWithOids = -1;
+}
+
+void
+parallel_restore_reestablish_db_connection(ArchiveHandle *AH)
+{
+	RestoreOptions *ropt = AH->ropt;
+
+	ConnectDatabase((Archive *) AH, ropt->dbname,
+					ropt->pghost, ropt->pgport, ropt->username,
+					ropt->promptPassword);
+
+	_doSetFixedOutputState(AH);
+}
+
 /*
  * Main engine for parallel restore.
  *
  * Work is done in three phases.
- * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
- * just as for a standard restore.	Second we process the remaining non-ACL
- * steps in parallel worker children (threads on Windows, processes on Unix),
- * each of which connects separately to the database.  Finally we process all
- * the ACL entries in a single connection (that happens back in
- * RestoreArchive).
+ * 1) First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+ *    just as for a standard restore.
+ * 2) Second we process the remaining non-ACL steps in parallel worker children
+ *    (threads on Windows, processes on Unix), each of which connects separately
+ *    to the database. In the case where we are unable to reopen the archive
+ *    (STDIN) we add a dependency between each SECTION_DATA item and the next
+ *    so they are processed one at a time and in order obviating the need to
+ *    reopen the archive. This is handled by fix_dependencies.
+ * 3) Finally we process all the ACL entries in a single connection
+ *    (that happens back in RestoreArchive).
+ *
+ * NOTE - in the case where we are unable to reopen the archive, we load data
+ * in the parent thread which blocks dispatching other work entries.  We do this
+ * to avoid issues with the archive file state and the need to communicate that
+ * up to the parent thread from any data loading child. To keep the parallelism
+ * up we favor non-data loading tasks over data loading tasks, but this does
+ * limit parallelism slightly over the optimal case where one thread would
+ * continually load data.  However tracking that would seem to involve a lot
+ * of extra complexity around inter-process communication and this gets us
+ * pretty close.
  */
 static void
 restore_toc_entries_parallel(ArchiveHandle *AH)
@@ -3555,7 +3596,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 	 * mainly to ensure that we don't exceed the specified number of parallel
 	 * connections.
 	 */
-	DisconnectDatabase(&AH->public);
+	parallel_restore_close_db_connection(AH);
 
 	/*
 	 * Set the pstate in the shutdown_info. The exit handler uses pstate if
@@ -3563,18 +3604,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 	 */
 	shutdown_info.pstate = pstate;
 
-	/* blow away any transient state from the old connection */
-	if (AH->currUser)
-		free(AH->currUser);
-	AH->currUser = NULL;
-	if (AH->currSchema)
-		free(AH->currSchema);
-	AH->currSchema = NULL;
-	if (AH->currTablespace)
-		free(AH->currTablespace);
-	AH->currTablespace = NULL;
-	AH->currWithOids = -1;
-
 	/*
 	 * Initialize the lists of pending and ready items.  After this setup, the
 	 * pending list is everything that needs to be done but is blocked by one
@@ -3623,8 +3652,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
 	ahlog(AH, 1, "entering main parallel loop\n");
 
+	/* Since we'll block while loading data when we can't reopen the file,
+	 * we prefer non-data work items in the selection here for parallelism */
 	while ((next_work_item = get_next_work_item(AH, &ready_list,
-												slots, n_slots)) != NULL ||
+												slots, n_slots, AH->ReopenPtr == NULL)) != NULL ||
 		   work_in_progress(slots, n_slots))
 	{
 		if (next_work_item != NULL)
@@ -3645,29 +3676,52 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
 			if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
 			{
-				/* There is work still to do and a worker slot available */
-				thandle		child;
-				RestoreArgs *args;
+				/* to avoid the need for inter-process communication around file
+				 * positioning etc. we do all data loading in the main loop */
+				if (AH->ReopenPtr == NULL && next_work_item->section == SECTION_DATA)
+				{
+					/* Load data, this will block dispatching work items till finished */
+					par_list_remove(next_work_item);
+					/* re-establish db connection, we know there's at least on slot
+					 * connection slot free so not exceeded requested max */
+					shutdown_info.pstate = NULL;
+					parallel_restore_reestablish_db_connection(AH);
 
-				ahlog(AH, 1, "launching item %d %s %s\n",
-					  next_work_item->dumpId,
-					  next_work_item->desc, next_work_item->tag);
+					(void) restore_toc_entry(AH, next_work_item, ropt, false);
+					reduce_dependencies(AH, next_work_item, &ready_list);
 
-				par_list_remove(next_work_item);
+					/* Release db connection so can be used in parallel children */
+					parallel_restore_close_db_connection(AH);
+					shutdown_info.pstate = pstate;
+
+					continue;
+				}
+				else
+				{
+					/* There is work still to do and a worker slot available */
+					thandle		child;
+					RestoreArgs *args;
 
-				/* this memory is dealloced in mark_work_done() */
-				args = pg_malloc(sizeof(RestoreArgs));
-				args->AH = CloneArchive(AH);
-				args->te = next_work_item;
-				args->pse = &pstate->pse[next_slot];
+					ahlog(AH, 1, "launching item %d %s %s\n",
+						  next_work_item->dumpId,
+						  next_work_item->desc, next_work_item->tag);
 
-				/* run the step in a worker child */
-				child = spawn_restore(args);
+					par_list_remove(next_work_item);
 
-				slots[next_slot].child_id = child;
-				slots[next_slot].args = args;
+					/* this memory is dealloced in mark_work_done() */
+					args = pg_malloc(sizeof(RestoreArgs));
+					args->AH = CloneArchive(AH);
+					args->te = next_work_item;
+					args->pse = &pstate->pse[next_slot];
 
-				continue;
+					/* run the step in a worker child */
+					child = spawn_restore(args);
+
+					slots[next_slot].child_id = child;
+					slots[next_slot].args = args;
+
+					continue;
+				}
 			}
 		}
 
@@ -3703,11 +3757,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 	/*
 	 * Now reconnect the single parent connection.
 	 */
-	ConnectDatabase((Archive *) AH, ropt->dbname,
-					ropt->pghost, ropt->pgport, ropt->username,
-					ropt->promptPassword);
-
-	_doSetFixedOutputState(AH);
+	parallel_restore_reestablish_db_connection(AH);
 
 	/*
 	 * Make sure there is no non-ACL work left due to, say, circular
@@ -3907,35 +3957,18 @@ par_list_remove(TocEntry *te)
  * The caller must do that after successfully dispatching the item.
  *
  * pref_non_data is for an alternative selection algorithm that gives
- * preference to non-data items if there is already a data load running.
- * It is currently disabled.
+ * preference to non-data items
  */
 static TocEntry *
 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
-				   ParallelSlot *slots, int n_slots)
+                                  ParallelSlot *slots, int n_slots, bool pref_non_data)
 {
-	bool		pref_non_data = false;	/* or get from AH->ropt */
 	TocEntry   *data_te = NULL;
 	TocEntry   *te;
 	int			i,
 				k;
 
 	/*
-	 * Bogus heuristics for pref_non_data
-	 */
-	if (pref_non_data)
-	{
-		int			count = 0;
-
-		for (k = 0; k < n_slots; k++)
-			if (slots[k].args->te != NULL &&
-				slots[k].args->te->section == SECTION_DATA)
-				count++;
-		if (n_slots == 0 || count * 4 < n_slots)
-			pref_non_data = false;
-	}
-
-	/*
 	 * Search the ready_list until we find a suitable item.
 	 */
 	for (te = ready_list->par_next; te != ready_list; te = te->par_next)
@@ -4102,6 +4135,10 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
 /*
  * Process the dependency information into a form useful for parallel restore.
  *
+ * In the case where we can't reopen the archive file (STDIN) we add
+ * dependencies to ensure the data sections are dumped serially and in order
+ * to match file pointer progression.
+ *
  * This function takes care of fixing up some missing or badly designed
  * dependencies, and then prepares subsidiary data structures that will be
  * used in the main parallel-restore logic, including:
@@ -4116,6 +4153,7 @@ static void
 fix_dependencies(ArchiveHandle *AH)
 {
 	TocEntry   *te;
+	TocEntry   *last_data_te = NULL;
 	int			i;
 
 	/*
@@ -4155,10 +4193,7 @@ fix_dependencies(ArchiveHandle *AH)
 				{
 					if (strcmp(te2->desc, "BLOBS") == 0)
 					{
-						te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
-						te->dependencies[0] = te2->dumpId;
-						te->nDeps++;
-						te->depCount++;
+						add_dependency(te, te2->dumpId);
 						break;
 					}
 				}
@@ -4168,6 +4203,25 @@ fix_dependencies(ArchiveHandle *AH)
 	}
 
 	/*
+	 * Add dependencies to ensure SECTION_DATA items are dumped serially and in
+	 * order in the case where we are unable to reopen the archive file (STDIN)
+	 */
+	if (AH->ReopenPtr == NULL)
+	{
+		for (te = AH->toc->next; te != AH->toc; te = te->next)
+		{
+			if (te->section == SECTION_DATA)
+			{
+				if (last_data_te)
+					add_dependency(te, last_data_te->dumpId);
+
+				last_data_te = te;
+			}
+		}
+	}
+
+
+	/*
 	 * At this point we start to build the revDeps reverse-dependency arrays,
 	 * so all changes of dependencies must be complete.
 	 */
@@ -4341,6 +4395,25 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
 }
 
 /*
+ * Adds a dependency to a TocEntry.  We use this to add dependencies that are
+ * known to be missing in the archive and setup constraints on the order in
+ * which table data is restored during parallel restore when we are unable to
+ * reopen the archive file.
+ */
+static void
+add_dependency(TocEntry *te, DumpId refId)
+{
+	if (te->dependencies == NULL)
+		te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
+	else
+		te->dependencies = (DumpId *) pg_realloc(te->dependencies, sizeof(DumpId) * (te->nDeps + 1));
+
+	te->dependencies[te->nDeps] = refId;
+	te->nDeps++;
+	te->depCount++;
+}
+
+/*
  * Set the created flag on the DATA member corresponding to the given
  * TABLE member
  */
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index f7dc5be..4c9aa44 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -184,6 +184,11 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
 		ctx->dataStart = _getFilePos(AH, ctx);
 	}
 
+	/* clear the ReopenPtr if we can't actually reopen the file (say STDIN) */
+	if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0 || !ctx->hasSeek)
+	{
+		AH->ReopenPtr = NULL;
+	}
 }
 
 /*
-- 
1.8.2.1

