This version of the patch should fix the "shared file descriptor" bug Russell Smith noticed. It also disables the 1/2 second sleep between forks, so the performance on a small db (regression) is vastly improved.

cheers

andrew



Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h	13 Apr 2008 03:49:21 -0000	1.47
--- pg_backup.h	26 Sep 2008 15:15:38 -0000
***************
*** 123,128 ****
--- 123,130 ----
  	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
  										 * to stderr */
  	bool		single_txn;
+     int         number_of_threads;
+ 	bool        truncate_before_load;
  
  	bool	   *idWanted;		/* array showing which dump IDs to emit */
  } RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
  extern void CloseArchive(Archive *AH);
  
  extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);
  
  /* Open an existing archive */
  extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c	5 Sep 2008 23:53:42 -0000	1.158
--- pg_backup_archiver.c	26 Sep 2008 15:15:39 -0000
***************
*** 27,38 ****
--- 27,50 ----
  
  #include <unistd.h>
  
+ #include <sys/types.h>
+ #include <sys/wait.h>
+ 
+ 
  #ifdef WIN32
  #include <io.h>
  #endif
  
  #include "libpq/libpq-fs.h"
  
+ typedef struct _parallel_slot 
+ {
+ 	pid_t   pid;
+ 	TocEntry *te;
+ 	DumpId  dumpId;
+ } ParallelSlot;
+ 
+ #define NO_SLOT (-1)
  
  const char *progname;
  
***************
*** 70,76 ****
--- 82,99 ----
  static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
  static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
  static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+ 
  
+ static ArchiveHandle *GAH;
  
  /*
   *	Wrapper functions.
***************
*** 125,137 ****
  
  /* Public */
  void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
  	ArchiveHandle *AH = (ArchiveHandle *) AHX;
  	TocEntry   *te;
  	teReqs		reqs;
  	OutputContext sav;
- 	bool		defnDumped;
  
  	AH->ropt = ropt;
  	AH->stage = STAGE_INITIALIZING;
--- 148,529 ----
  
  /* Public */
  void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+ 
+ 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ 	ParallelSlot  *slots;
+     int next_slot;
+     TocEntry *next_work_item = NULL;
+     int work_status;
+ 	pid_t ret_child;
+ 	int n_slots = ropt->number_of_threads;
+ 	TocEntry *te;
+ 	teReqs    reqs;
+ 	
+ 
+ 	/* 	AH->debugLevel = 99; */
+ 	/* some routines that use ahlog() don't get passed AH */
+ 	GAH = AH;
+ 
+ 	ahlog(AH,1,"entering RestoreARchiveParallel\n");
+ 
+ 
+ 	slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+ 	AH->ropt = ropt;
+ 
+ 	if (ropt->create)
+ 		die_horribly(AH,modulename,
+ 					 "parallel restore is incompatible with --create\n");
+ 
+ 	if (ropt->dropSchema)
+ 		die_horribly(AH,modulename,
+ 					 "parallel restore is incompatible with --clean\n");
+ 
+ 	if (!ropt->useDB)
+ 		die_horribly(AH,modulename,
+ 					 "parallel restore requires direct database connection\n");
+ 
+ 
+ #ifndef HAVE_LIBZ
+ 
+ 	/* make sure we won't need (de)compression we haven't got */
+ 	if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ 	{
+ 		for (te = AH->toc->next; te != AH->toc; te = te->next)
+ 		{
+ 			reqs = _tocEntryRequired(te, ropt, false);
+ 			if (te->hadDumper && (reqs & REQ_DATA) != 0)
+ 				die_horribly(AH, modulename, 
+ 							 "cannot restore from compressed archive (compression not supported in this installation)\n");
+ 		}
+ 	}
+ #endif
+ 
+ 	ahlog(AH, 1, "connecting to database for restore\n");
+ 	if (AH->version < K_VERS_1_3)
+ 		die_horribly(AH, modulename, 
+ 					 "direct database connections are not supported in pre-1.3 archives\n");
+ 
+ 	/* XXX Should get this from the archive */
+ 	AHX->minRemoteVersion = 070100;
+ 	AHX->maxRemoteVersion = 999999;
+ 
+ 	/* correct dependency counts in case we're doing a partial restore */
+     if (ropt->idWanted == NULL)
+ 		InitDummyWantedList(AHX,ropt);
+ 	_fix_dependency_counts(AH);
+ 
+ 	/*
+ 	 * Since we're talking to the DB directly, don't send comments since they
+ 	 * obscure SQL when displaying errors
+ 	 */
+ 	AH->noTocComments = 1;
+ 
+ 	/* Do all the early stuff in a single connection in the parent.
+ 	 * There's no great point in running it in parallel and it will actually
+ 	 * run faster in a single connection because we avoid all the connection
+ 	 * and setup overhead, including the 0.5s sleep below.
+ 	 */
+ 	ConnectDatabase(AHX, ropt->dbname,
+ 					ropt->pghost, ropt->pgport, ropt->username,
+ 					ropt->requirePassword);
+ 
+ 	
+ 	/*
+ 	 * Establish important parameter values right away.
+ 	 */
+ 	_doSetFixedOutputState(AH);
+ 
+ 	while((next_work_item = get_next_work_item(AH)) != NULL)
+ 	{
+ 		/* XXX need to improve this test in case there is no table data */
+ 		/* need to test for indexes, FKs, PK, Unique, etc */
+ 		if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+ 			break;
+ 		(void) _restore_one_te(AH, next_work_item, ropt, false);
+ 
+ 		next_work_item->prestored = true;
+ 
+ 		_reduce_dependencies(AH,next_work_item);
+ 	}
+ 		
+ 
+ 	/* 
+ 	 * now close parent connection in prep for parallel step.
+ 	 */
+ 	PQfinish(AH->connection);
+ 	AH->connection = NULL;
+ 
+ 	/* main parent loop */
+ 
+ 	ahlog(AH,1,"entering main loop\n");
+ 
+     while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+ 		   (work_is_being_done(slots,n_slots)))
+ 	{
+ 		if (next_work_item != NULL && 
+ 			((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+ 		{
+ 			/* there is work still to do and a worker slot available */
+ 
+ 			pid_t child;
+ 
+ 			next_work_item->prestored = true;
+ 
+ 			child = fork();
+ 			if (child == 0)
+ 			{
+ 				prestore(AH,next_work_item);
+ 				/* should not happen ... we expect prestore to exit */
+ 				exit(1);
+ 			}
+ 			else if (child > 0)
+ 			{
+ 				slots[next_slot].pid = child;
+ 				slots[next_slot].te = next_work_item;
+ 				slots[next_slot].dumpId = next_work_item->dumpId;
+ 			}
+ 			else
+ 			{
+ 				/* XXX fork error - handle it! */
+ 			}
+ 			/* delay just long enough betweek forks to give the catalog some
+ 			 * breathing space. Without this sleep I got 
+ 			 * "tuple concurrently updated" errors.
+ 			 */
+ 			/* pg_usleep(500000); */
+ 			continue; /* in case the slots are not yet full */
+ 		}
+ 		/* if we get here there must be work being done */
+ 		ret_child = wait(&work_status);
+ 
+ 		if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+ 		{
+ 			mark_work_done(AH, ret_child, slots, n_slots);
+ 		}
+ 		else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+ 		{
+ 			int i;
+ 
+ 			for (i = 0; i < n_slots; i++)
+ 			{
+ 				if (slots[i].pid == ret_child)
+ 					_inhibit_data_for_failed_table(AH, slots[i].te);
+ 				break;
+ 			}
+ 			mark_work_done(AH, ret_child, slots, n_slots);
+ 		}
+ 		else
+ 		{
+ 			/* XXX something went wrong - deal with it */
+ 		}
+ 	}
+ 
+ 	/* 
+ 	 * now process the ACLs - no need to do this in parallel
+ 	 */
+ 
+ 	/* reconnect from parent */
+ 	ConnectDatabase(AHX, ropt->dbname,
+ 					ropt->pghost, ropt->pgport, ropt->username,
+ 					ropt->requirePassword);
+ 
+ 	/*
+ 	 * Scan TOC to output ownership commands and ACLs
+ 	 */
+ 	for (te = AH->toc->next; te != AH->toc; te = te->next)
+ 	{
+ 		AH->currentTE = te;
+ 
+ 		/* Work out what, if anything, we want from this entry */
+ 		reqs = _tocEntryRequired(te, ropt, true);
+ 
+ 		if ((reqs & REQ_SCHEMA) != 0)	/* We want the schema */
+ 		{
+ 			ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+ 				  te->desc, te->tag);
+ 			_printTocEntry(AH, te, ropt, false, true);
+ 		}
+ 	}
+ 
+ 	/* clean up */
+ 	PQfinish(AH->connection);
+ 	AH->connection = NULL;
+ 
+ }
+ 
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+ 	ahlog(GAH,1,"is work being done?\n");
+ 	while(n_slots--)
+ 	{
+ 		if (slot->pid > 0)
+ 			return true;
+ 		slot++;
+ 	}
+ 	ahlog(GAH,1,"work is not being done\n");
+ 	return false;
+ }
+ 
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+ 	int i;
+ 	
+ 	for (i = 0; i < n_slots; i++)
+ 	{
+ 		if (slots[i].pid == 0)
+ 		{
+ 			ahlog(GAH,1,"available slots is %d\n",i);
+ 			return i;
+ 		}
+ 	}
+ 	ahlog(GAH,1,"No slot available\n");
+ 	return NO_SLOT;
+ }
+ 
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+ 	TocEntry *te;
+ 	teReqs    reqs;
+ 
+ 	/* just search from the top of the queue until we find an available item.
+ 	 * Note that the queue isn't reordered in the current implementation. If
+ 	 * we ever do reorder it, then certain code that processes entries from the
+ 	 * current item to the end of the queue will probably need to be 
+ 	 * re-examined.
+ 	 */
+ 
+ 	for (te = AH->toc->next; te != AH->toc; te = te->next)
+ 	{
+ 		if (!te->prestored && te->nDeps < 1)
+ 		{
+ 			/* make sure it's not an ACL */
+ 			reqs = _tocEntryRequired (te, AH->ropt, false);
+ 			if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+ 			{
+ 				ahlog(AH,1,"next item is %d\n",te->dumpId);
+ 				return te;
+ 			}
+ 		}
+ 	}
+ 	ahlog(AH,1,"No item ready\n");
+ 	return NULL;
+ }
+ 
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	RestoreOptions *ropt = AH->ropt;
+ 	int retval;
+ 
+ 	/* close and reopen the archive so we have a private copy that doesn't
+ 	 * stomp on anyone else's file pointer
+ 	 */
+ 
+ 	(AH->ReopenPtr)(AH);
+ 
+ 	ConnectDatabase((Archive *)AH, ropt->dbname,
+ 					ropt->pghost, ropt->pgport, ropt->username,
+ 					ropt->requirePassword);
+ 
+ 	/*
+ 	 * Establish important parameter values right away.
+ 	 */
+ 	_doSetFixedOutputState(AH);
+ 
+ 	retval = _restore_one_te(AH, te, ropt, true);
+ 
+ 	PQfinish(AH->connection);
+ 	exit(retval);
+ 	
+ }
+ 
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker, 
+ 			   ParallelSlot *slots, int n_slots)
+ {
+ 
+ 	TocEntry *te = NULL;
+ 	int i;
+ 
+ 	for (i = 0; i < n_slots; i++)
+ 	{
+ 		if (slots[i].pid == worker)
+ 		{
+ 			te = slots[i].te;
+ 			slots[i].pid = 0;
+ 			slots[i].te = NULL;
+ 			slots[i].dumpId = 0;
+ 			break;
+ 		}
+ 	}
+ 
+ 	/* Assert (te != NULL); */
+ 
+ 	_reduce_dependencies(AH,te);
+ 
+ 	
+ }
+ 
+ 
+ /*
+  * Make sure the head of each dependency chain is a live item 
+  *
+  * Once this is established the property will be maintained by
+  * _reduce_dependencies called as items are done.
+  */
+ static void 
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+ 	TocEntry * te;
+ 	RestoreOptions * ropt = AH->ropt;
+ 	
+ 	for (te = AH->toc->next; te != AH->toc; te = te->next)
+ 		if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1])
+ 			_reduce_dependencies(AH,te);
+ }
+ 
+ static void 
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+ 	DumpId item = te->dumpId;
+ 	RestoreOptions * ropt = AH->ropt;
+ 	int i;
+ 
+ 	for (te = te->next; te != AH->toc; te = te->next)
+ 	{
+ 		for (i = 0; i < te->nDeps; i++)
+ 			if (te->dependencies[i] == item)
+ 			{
+ 				te->nDeps = te->nDeps - 1;
+ 				/* 
+ 				 * If this item won't in fact be done, and is now  at
+ 				 * 0 dependency count, we pretend it's been done and
+ 				 * reduce the dependency counts of all the things that
+ 				 * depend on it, by a recursive call
+ 				 */
+ 				if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1])
+ 					_reduce_dependencies(AH,te);
+ 
+ 				break;
+ 			}
+ 	}
+ 
+ }
+ 
+ 
+ /* Public */
+ void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
  	ArchiveHandle *AH = (ArchiveHandle *) AHX;
  	TocEntry   *te;
  	teReqs		reqs;
  	OutputContext sav;
  
  	AH->ropt = ropt;
  	AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 563,582 ----
  		AH->noTocComments = 1;
  	}
  
+ #ifndef HAVE_LIBZ
+ 
+ 	/* make sure we won't need (de)compression we haven't got */
+ 	if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ 	{
+ 		for (te = AH->toc->next; te != AH->toc; te = te->next)
+ 		{
+ 			reqs = _tocEntryRequired(te, ropt, false);
+ 			if (te->hadDumper && (reqs & REQ_DATA) != 0)
+ 				die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
+ 		}
+ 	}
+ #endif
+ 
  	/*
  	 * Work out if we have an implied data-only restore. This can happen if
  	 * the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
  	 */
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		AH->currentTE = te;
! 
! 		/* Work out what, if anything, we want from this entry */
! 		reqs = _tocEntryRequired(te, ropt, false);
! 
! 		/* Dump any relevant dump warnings to stderr */
! 		if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
! 		{
! 			if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
! 				write_msg(modulename, "warning from original dump file: %s\n", te->defn);
! 			else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
! 				write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
! 		}
! 
! 		defnDumped = false;
! 
! 		if ((reqs & REQ_SCHEMA) != 0)	/* We want the schema */
! 		{
! 			ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
! 
! 			_printTocEntry(AH, te, ropt, false, false);
! 			defnDumped = true;
! 
! 			/*
! 			 * If we could not create a table and --no-data-for-failed-tables
! 			 * was given, ignore the corresponding TABLE DATA
! 			 */
! 			if (ropt->noDataForFailedTables &&
! 				AH->lastErrorTE == te &&
! 				strcmp(te->desc, "TABLE") == 0)
! 			{
! 				TocEntry   *tes;
! 
! 				ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
! 					  te->tag);
! 
! 				for (tes = te->next; tes != AH->toc; tes = tes->next)
! 				{
! 					if (strcmp(tes->desc, "TABLE DATA") == 0 &&
! 						strcmp(tes->tag, te->tag) == 0 &&
! 						strcmp(tes->namespace ? tes->namespace : "",
! 							   te->namespace ? te->namespace : "") == 0)
! 					{
! 						/* mark it unwanted */
! 						ropt->idWanted[tes->dumpId - 1] = false;
! 						break;
! 					}
! 				}
! 			}
! 
! 			/* If we created a DB, connect to it... */
! 			if (strcmp(te->desc, "DATABASE") == 0)
! 			{
! 				ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
! 				_reconnectToDB(AH, te->tag);
! 			}
! 		}
! 
! 		/*
! 		 * If we have a data component, then process it
! 		 */
! 		if ((reqs & REQ_DATA) != 0)
! 		{
! 			/*
! 			 * hadDumper will be set if there is genuine data component for
! 			 * this node. Otherwise, we need to check the defn field for
! 			 * statements that need to be executed in data-only restores.
! 			 */
! 			if (te->hadDumper)
! 			{
! 				/*
! 				 * If we can output the data, then restore it.
! 				 */
! 				if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
! 				{
! #ifndef HAVE_LIBZ
! 					if (AH->compression != 0)
! 						die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
! #endif
! 
! 					_printTocEntry(AH, te, ropt, true, false);
! 
! 					if (strcmp(te->desc, "BLOBS") == 0 ||
! 						strcmp(te->desc, "BLOB COMMENTS") == 0)
! 					{
! 						ahlog(AH, 1, "restoring %s\n", te->desc);
! 
! 						_selectOutputSchema(AH, "pg_catalog");
! 
! 						(*AH->PrintTocDataPtr) (AH, te, ropt);
! 					}
! 					else
! 					{
! 						_disableTriggersIfNecessary(AH, te, ropt);
! 
! 						/* Select owner and schema as necessary */
! 						_becomeOwner(AH, te);
! 						_selectOutputSchema(AH, te->namespace);
! 
! 						ahlog(AH, 1, "restoring data for table \"%s\"\n",
! 							  te->tag);
! 
! 						/*
! 						 * If we have a copy statement, use it. As of V1.3,
! 						 * these are separate to allow easy import from
! 						 * withing a database connection. Pre 1.3 archives can
! 						 * not use DB connections and are sent to output only.
! 						 *
! 						 * For V1.3+, the table data MUST have a copy
! 						 * statement so that we can go into appropriate mode
! 						 * with libpq.
! 						 */
! 						if (te->copyStmt && strlen(te->copyStmt) > 0)
! 						{
! 							ahprintf(AH, "%s", te->copyStmt);
! 							AH->writingCopyData = true;
! 						}
! 
! 						(*AH->PrintTocDataPtr) (AH, te, ropt);
! 
! 						AH->writingCopyData = false;
! 
! 						_enableTriggersIfNecessary(AH, te, ropt);
! 					}
! 				}
! 			}
! 			else if (!defnDumped)
! 			{
! 				/* If we haven't already dumped the defn part, do so now */
! 				ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
! 				_printTocEntry(AH, te, ropt, false, false);
! 			}
! 		}
! 	}							/* end loop over TOC entries */
  
  	/*
  	 * Scan TOC again to output ownership commands and ACLs
--- 676,683 ----
  	 */
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		(void) _restore_one_te(AH, te, ropt, false);
! 	}						
  
  	/*
  	 * Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 725,905 ----
  	}
  }
  
+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te, 
+ 				RestoreOptions *ropt, bool is_parallel)
+ {
+ 	teReqs		reqs;
+ 	bool		defnDumped;
+ 	int         retval = 0;
+ 
+ 	AH->currentTE = te;
+ 
+ 	/* Work out what, if anything, we want from this entry */
+ 	reqs = _tocEntryRequired(te, ropt, false);
+ 
+ 	/* Dump any relevant dump warnings to stderr */
+ 	if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+ 	{
+ 		if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+ 			write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+ 		else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+ 			write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+ 	}
+ 
+ 	defnDumped = false;
+ 
+ 	if ((reqs & REQ_SCHEMA) != 0)	/* We want the schema */
+ 	{
+ 		ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+ 
+ 		_printTocEntry(AH, te, ropt, false, false);
+ 		defnDumped = true;
+ 
+ 		/*
+ 		 * If we could not create a table and --no-data-for-failed-tables
+ 		 * was given, ignore the corresponding TABLE DATA
+ 		 *
+ 		 * For the parallel case this must be done in the parent, so we just
+ 		 * set a return value.
+ 		 */
+ 		if (ropt->noDataForFailedTables &&
+ 			AH->lastErrorTE == te &&
+ 			strcmp(te->desc, "TABLE") == 0)
+ 		{
+ 			if (is_parallel)
+ 				retval = 1;
+ 		    else
+ 				_inhibit_data_for_failed_table(AH,te);
+ 		}
+ 
+ 		/* If we created a DB, connect to it... */
+ 		/* won't happen in parallel restore */
+ 		if (strcmp(te->desc, "DATABASE") == 0)
+ 		{
+ 			ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+ 			_reconnectToDB(AH, te->tag);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * If we have a data component, then process it
+ 	 */
+ 	if ((reqs & REQ_DATA) != 0)
+ 	{
+ 		/*
+ 		 * hadDumper will be set if there is genuine data component for
+ 		 * this node. Otherwise, we need to check the defn field for
+ 		 * statements that need to be executed in data-only restores.
+ 		 */
+ 		if (te->hadDumper)
+ 		{
+ 			/*
+ 			 * If we can output the data, then restore it.
+ 			 */
+ 			if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+ 			{
+ 				_printTocEntry(AH, te, ropt, true, false);
+ 
+ 				if (strcmp(te->desc, "BLOBS") == 0 ||
+ 					strcmp(te->desc, "BLOB COMMENTS") == 0)
+ 				{
+ 					ahlog(AH, 1, "restoring %s\n", te->desc);
+ 
+ 					_selectOutputSchema(AH, "pg_catalog");
+ 
+ 					(*AH->PrintTocDataPtr) (AH, te, ropt);
+ 				}
+ 				else
+ 				{
+ 					_disableTriggersIfNecessary(AH, te, ropt);
+ 
+ 					/* Select owner and schema as necessary */
+ 					_becomeOwner(AH, te);
+ 					_selectOutputSchema(AH, te->namespace);
+ 
+ 					ahlog(AH, 1, "restoring data for table \"%s\"\n",
+ 						  te->tag);
+ 
+ 					if (ropt->truncate_before_load)
+ 					{
+ 						if (AH->connection)
+ 							StartTransaction(AH);
+ 						else
+ 							ahprintf(AH, "BEGIN;\n\n");
+ 
+ 						ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+ 								 fmtId(te->tag));					}
+ 
+ 					/*
+ 					 * If we have a copy statement, use it. As of V1.3,
+ 					 * these are separate to allow easy import from
+ 					 * withing a database connection. Pre 1.3 archives can
+ 					 * not use DB connections and are sent to output only.
+ 					 *
+ 					 * For V1.3+, the table data MUST have a copy
+ 					 * statement so that we can go into appropriate mode
+ 					 * with libpq.
+ 					 */
+ 					if (te->copyStmt && strlen(te->copyStmt) > 0)
+ 					{
+ 						ahprintf(AH, "%s", te->copyStmt);
+ 						AH->writingCopyData = true;
+ 					}
+ 
+ 					(*AH->PrintTocDataPtr) (AH, te, ropt);
+ 
+ 					AH->writingCopyData = false;
+ 
+ 					if (ropt->truncate_before_load)
+ 					{
+ 						if (AH->connection)
+ 							CommitTransaction(AH);
+ 						else
+ 							ahprintf(AH, "COMMIT;\n\n");
+ 					}
+ 					
+ 
+ 					_enableTriggersIfNecessary(AH, te, ropt);
+ 				}
+ 			}
+ 		}
+ 		else if (!defnDumped)
+ 		{
+ 			/* If we haven't already dumped the defn part, do so now */
+ 			ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+ 			_printTocEntry(AH, te, ropt, false, false);
+ 		}
+ 	}
+ 
+ 	return retval;
+ }
+ 
+ static void 
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+ 	TocEntry   *tes;
+ 	RestoreOptions *ropt = AH->ropt;
+ 
+ 	ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+ 		  te->tag);
+ 
+ 	for (tes = te->next; tes != AH->toc; tes = tes->next)
+ 	{
+ 		if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+ 			strcmp(tes->tag, te->tag) == 0 &&
+ 			strcmp(tes->namespace ? tes->namespace : "",
+ 				   te->namespace ? te->namespace : "") == 0)
+ 		{
+ 			/* mark it unwanted */
+ 			ropt->idWanted[tes->dumpId - 1] = false;
+ 
+ 			_reduce_dependencies(AH, tes);
+ 			break;
+ 		}
+ 	}
+ }
+ 
  /*
   * Allocate a new RestoreOptions block.
   * This is mainly so we can initialize it, but also for future expansion,
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h	7 Nov 2007 12:24:24 -0000	1.76
--- pg_backup_archiver.h	26 Sep 2008 15:15:39 -0000
***************
*** 99,104 ****
--- 99,105 ----
  struct _restoreList;
  
  typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
  typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
  
  typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
  	WriteBufPtr WriteBufPtr;	/* Write a buffer of output to the archive */
  	ReadBufPtr ReadBufPtr;		/* Read a buffer of input from the archive */
  	ClosePtr ClosePtr;			/* Close the archive */
+ 	ReopenPtr ReopenPtr;			/* Reopen the archive */
  	WriteExtraTocPtr WriteExtraTocPtr;	/* Write extra TOC entry data
  										 * associated with the current archive
  										 * format */
***************
*** 231,236 ****
--- 233,239 ----
  	char	   *archdbname;		/* DB name *read* from archive */
  	bool		requirePassword;
  	PGconn	   *connection;
+ 	char       *cachepw; 
  	int			connectToDB;	/* Flag to indicate if direct DB connection is
  								 * required */
  	bool		writingCopyData;	/* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
  	DumpId		dumpId;
  	bool		hadDumper;		/* Archiver was passed a dumper routine (used
  								 * in restore) */
+ 	bool        prestored;      /* keep track of parallel restore */
  	char	   *tag;			/* index tag */
  	char	   *namespace;		/* null or empty string if not in a schema */
  	char	   *tablespace;		/* null if not in a tablespace; empty string
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c	28 Oct 2007 21:55:52 -0000	1.40
--- pg_backup_custom.c	26 Sep 2008 15:15:39 -0000
***************
*** 40,45 ****
--- 40,46 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
  static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
+ 	AH->ReopenPtr = _ReopenArchive;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
  	AH->FH = NULL;
  }
  
+ static void 
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 	pgoff_t		tpos;
+ 
+ 	if (AH->mode == archModeWrite)
+ 	{
+ 		die_horribly(AH,modulename,"Can only reopen input archives");
+ 	}
+ 	else if ((! AH->fSpec) ||  strcmp(AH->fSpec, "") == 0)
+ 	{
+ 		die_horribly(AH,modulename,"Cannot reopen stdin");
+ 	}
+ 
+ 	tpos = ftello(AH->FH);
+ 
+ 	if (fclose(AH->FH) != 0)
+ 		die_horribly(AH, modulename, "could not close archive file: %s\n", 
+ 					 strerror(errno));
+ 
+ 	AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+ 	if (!AH->FH)
+ 		die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+ 					 AH->fSpec, strerror(errno));
+ 
+ 	if (ctx->hasSeek)
+ 	{
+ 		fseeko(AH->FH, tpos, SEEK_SET);
+ 	}
+ 	else
+ 	{
+ 		die_horribly(AH,modulename,"cannot reopen non-seekable file");
+ 	}
+ 	
+ }
+ 
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c	16 Aug 2008 02:25:06 -0000	1.80
--- pg_backup_db.c	26 Sep 2008 15:15:39 -0000
***************
*** 206,220 ****
  	if (AH->connection)
  		die_horribly(AH, modulename, "already connected to a database\n");
  
! 	if (reqPwd)
  	{
  		password = simple_prompt("Password: ", 100, false);
  		if (password == NULL)
  			die_horribly(AH, modulename, "out of memory\n");
  		AH->requirePassword = true;
  	}
  	else
  		AH->requirePassword = false;
  
  	/*
  	 * Start the connection.  Loop until we have a password if requested by
--- 206,226 ----
  	if (AH->connection)
  		die_horribly(AH, modulename, "already connected to a database\n");
  
! 	if (reqPwd && AH->cachepw == NULL)
  	{
  		password = simple_prompt("Password: ", 100, false);
  		if (password == NULL)
  			die_horribly(AH, modulename, "out of memory\n");
  		AH->requirePassword = true;
  	}
+ 	else if (reqPwd)
+ 	{
+ 		password = AH->cachepw;
+ 	}
  	else
+ 	{
  		AH->requirePassword = false;
+ 	}
  
  	/*
  	 * Start the connection.  Loop until we have a password if requested by
***************
*** 241,247 ****
  	} while (new_pass);
  
  	if (password)
! 		free(password);
  
  	/* check to see that the backend connection was successfully made */
  	if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 247,253 ----
  	} while (new_pass);
  
  	if (password)
! 		AH->cachepw = password;
  
  	/* check to see that the backend connection was successfully made */
  	if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c	28 Oct 2007 21:55:52 -0000	1.34
--- pg_backup_files.c	26 Sep 2008 15:15:39 -0000
***************
*** 87,92 ****
--- 87,93 ----
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
+ 	AH->ReopenPtr = NULL;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c	15 Nov 2007 21:14:41 -0000	1.62
--- pg_backup_tar.c	26 Sep 2008 15:15:39 -0000
***************
*** 143,148 ****
--- 143,149 ----
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
+ 	AH->ReopenPtr = NULL;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c	13 Apr 2008 03:49:22 -0000	1.88
--- pg_restore.c	26 Sep 2008 15:15:39 -0000
***************
*** 78,83 ****
--- 78,84 ----
  	static int	no_data_for_failed_tables = 0;
  	static int  outputNoTablespaces = 0;
  	static int	use_setsessauth = 0;
+ 	static int  truncate_before_load = 0;
  
  	struct option cmdopts[] = {
  		{"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
  		{"ignore-version", 0, NULL, 'i'},
  		{"index", 1, NULL, 'I'},
  		{"list", 0, NULL, 'l'},
+ 		{"multi-thread",1,NULL,'m'},
  		{"no-privileges", 0, NULL, 'x'},
  		{"no-acl", 0, NULL, 'x'},
  		{"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
  		{"disable-triggers", no_argument, &disable_triggers, 1},
  		{"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
  		{"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+ 		{"truncate-before-load", no_argument, &truncate_before_load, 1},
  		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
  
  		{NULL, 0, NULL, 0}
***************
*** 139,145 ****
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
  							cmdopts, NULL)) != -1)
  	{
  		switch (c)
--- 142,148 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
  							cmdopts, NULL)) != -1)
  	{
  		switch (c)
***************
*** 182,187 ****
--- 185,194 ----
  				opts->tocFile = strdup(optarg);
  				break;
  
+ 			case 'm':
+ 				opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+ 				break;
+ 
  			case 'n':			/* Dump data for this schema only */
  				opts->schemaNames = strdup(optarg);
  				break;
***************
*** 262,268 ****
  				break;
  
  			case 0:
! 				/* This covers the long options equivalent to -X xxx. */
  				break;
  
  			case '1':			/* Restore data in a single transaction */
--- 269,278 ----
  				break;
  
  			case 0:
! 				/* 
! 				 * This covers the long options without a short equivalent, 
! 				 * including those equivalent to -X xxx. 
! 				 */
  				break;
  
  			case '1':			/* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
  	opts->noDataForFailedTables = no_data_for_failed_tables;
  	opts->noTablespace = outputNoTablespaces;
  	opts->use_setsessauth = use_setsessauth;
+ 	opts->truncate_before_load = truncate_before_load;
+ 
+ 	if (opts->single_txn)
+ 	{
+ 		if (opts->number_of_threads > 1)
+ 		{
+ 			write_msg(NULL, "single transaction not compatible with multi-threading");
+ 			exit(1);
+ 		}
+ 		else if (opts->truncate_before_load)
+ 		{
+ 			write_msg(NULL, "single transaction not compatible with truncate-before-load");
+ 			exit(1);
+ 		}
+ 	}
  
  	if (opts->formatName)
  	{
***************
*** 330,335 ****
--- 355,362 ----
  
  	AH = OpenArchive(inputFileSpec, opts->format);
  
+ 	/* XXX looks like we'll have to do sanity checks in the parallel archiver */
+ 
  	/* Let the archiver know how noisy to be */
  	AH->verbose = opts->verbose;
  
***************
*** 351,356 ****
--- 378,385 ----
  
  	if (opts->tocSummary)
  		PrintTOCSummary(AH, opts);
+ 	else if (opts->number_of_threads > 1)
+ 		RestoreArchiveParallel(AH, opts);
  	else
  		RestoreArchive(AH, opts);
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to