Andrew Dunstan wrote:
this works better but there is something fishy still - using the same
dump file I get a proper restore using pg_restore normally. If I
however use -m for a parallel one I only get parts (in this case only
243 of the 709 tables) of the database restored ...
Yes, there are several funny things going on, including some stuff
with dependencies. I'll have a new patch tomorrow with luck. Thanks
for testing.
OK, in this version a whole heap of bugs are fixed, mainly those to do
with dependencies and saved state. I get identical row counts in the
source and destination now, quite reliably.
cheers
andrew
Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47
--- pg_backup.h 29 Sep 2008 02:43:51 -0000
***************
*** 123,128 ****
--- 123,130 ----
int suppressDumpWarnings; /* Suppress output of WARNING entries
* to stderr */
bool single_txn;
+ int number_of_threads;
+ bool truncate_before_load;
bool *idWanted; /* array showing which dump IDs to emit */
} RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
extern void CloseArchive(Archive *AH);
extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);
/* Open an existing archive */
extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158
--- pg_backup_archiver.c 29 Sep 2008 02:43:52 -0000
***************
*** 27,38 ****
--- 27,50 ----
#include <unistd.h>
+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
#ifdef WIN32
#include <io.h>
#endif
#include "libpq/libpq-fs.h"
+ typedef struct _parallel_slot
+ {
+ pid_t pid;
+ TocEntry *te;
+ DumpId dumpId;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)
const char *progname;
***************
*** 70,76 ****
--- 82,99 ----
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+
+ static ArchiveHandle *GAH;
/*
* Wrapper functions.
***************
*** 125,137 ****
/* Public */
void
RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
ArchiveHandle *AH = (ArchiveHandle *) AHX;
TocEntry *te;
teReqs reqs;
OutputContext sav;
- bool defnDumped;
AH->ropt = ropt;
AH->stage = STAGE_INITIALIZING;
--- 148,579 ----
/* Public */
void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ ParallelSlot *slots;
+ int next_slot;
+ TocEntry *next_work_item = NULL;
+ int work_status;
+ pid_t ret_child;
+ int n_slots = ropt->number_of_threads;
+ TocEntry *te;
+ teReqs reqs;
+
+
+ /* AH->debugLevel = 99; */
+ /* some routines that use ahlog() don't get passed AH */
+ GAH = AH;
+
+ ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+ slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+ AH->ropt = ropt;
+
+ /*
+ if (ropt->create)
+ die_horribly(AH,modulename,
+ "parallel restore is incompatible with --create\n");
+ */
+
+
+ if (ropt->dropSchema)
+ die_horribly(AH,modulename,
+ "parallel restore is incompatible with --clean\n");
+
+ if (!ropt->useDB)
+ die_horribly(AH,modulename,
+ "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+ /* make sure we won't need (de)compression we haven't got */
+ if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ reqs = _tocEntryRequired(te, ropt, false);
+ if (te->hadDumper && (reqs & REQ_DATA) != 0)
+ die_horribly(AH, modulename,
+ "cannot restore from compressed archive (compression not supported in this installation)\n");
+ }
+ }
+ #endif
+
+ ahlog(AH, 1, "connecting to database for restore\n");
+ if (AH->version < K_VERS_1_3)
+ die_horribly(AH, modulename,
+ "direct database connections are not supported in pre-1.3 archives\n");
+
+ /* XXX Should get this from the archive */
+ AHX->minRemoteVersion = 070100;
+ AHX->maxRemoteVersion = 999999;
+
+ /* correct dependency counts in case we're doing a partial restore */
+ if (ropt->idWanted == NULL)
+ InitDummyWantedList(AHX,ropt);
+ _fix_dependency_counts(AH);
+
+ /*
+ * Since we're talking to the DB directly, don't send comments since they
+ * obscure SQL when displaying errors
+ */
+ AH->noTocComments = 1;
+
+ /* Do all the early stuff in a single connection in the parent.
+ * There's no great point in running it in parallel and it will actually
+ * run faster in a single connection because we avoid all the connection
+ * and setup overhead, including the 0.5s sleep below.
+ */
+ ConnectDatabase(AHX, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->requirePassword);
+
+
+ /*
+ * Establish important parameter values right away.
+ */
+ _doSetFixedOutputState(AH);
+
+ while((next_work_item = get_next_work_item(AH)) != NULL)
+ {
+ /* XXX need to improve this test in case there is no table data */
+ /* need to test for indexes, FKs, PK, Unique, etc */
+ if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+ break;
+ (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+ next_work_item->prestored = true;
+
+ _reduce_dependencies(AH,next_work_item);
+ }
+
+
+ /*
+ * now close parent connection in prep for parallel step.
+ */
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+
+ /* blow away any preserved state from the previous connection */
+
+ if (AH->currSchema)
+ free(AH->currSchema);
+ AH->currSchema = strdup("");
+ if (AH->currUser)
+ free(AH->currUser);
+ AH->currUser = strdup("");
+ if (AH->currTablespace)
+ free(AH->currTablespace);
+ AH->currTablespace = NULL;
+ AH->currWithOids = -1;
+
+ /* main parent loop */
+
+ ahlog(AH,1,"entering main loop\n");
+
+ while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+ (work_is_being_done(slots,n_slots)))
+ {
+ if (next_work_item != NULL &&
+ ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+ {
+ /* there is work still to do and a worker slot available */
+
+ pid_t child;
+
+ next_work_item->prestored = true;
+
+ child = fork();
+ if (child == 0)
+ {
+ prestore(AH,next_work_item);
+ /* should not happen ... we expect prestore to exit */
+ exit(1);
+ }
+ else if (child > 0)
+ {
+ slots[next_slot].pid = child;
+ slots[next_slot].te = next_work_item;
+ slots[next_slot].dumpId = next_work_item->dumpId;
+ }
+ else
+ {
+ /* XXX fork error - handle it! */
+ }
+ /* delay just long enough betweek forks to give the catalog some
+ * breathing space. Without this sleep I got
+ * "tuple concurrently updated" errors.
+ */
+ /* pg_usleep(500000); */
+ continue; /* in case the slots are not yet full */
+ }
+ /* if we get here there must be work being done */
+ ret_child = wait(&work_status);
+
+ if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+ {
+ mark_work_done(AH, ret_child, slots, n_slots);
+ }
+ else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+ {
+ int i;
+
+ for (i = 0; i < n_slots; i++)
+ {
+ if (slots[i].pid == ret_child)
+ _inhibit_data_for_failed_table(AH, slots[i].te);
+ break;
+ }
+ mark_work_done(AH, ret_child, slots, n_slots);
+ }
+ else
+ {
+ /* XXX something went wrong - deal with it */
+ }
+ }
+
+ /*
+ * now process the ACLs - no need to do this in parallel
+ */
+
+ /* reconnect from parent */
+ ConnectDatabase(AHX, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->requirePassword);
+
+ /*
+ * Scan TOC to output ownership commands and ACLs
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ AH->currentTE = te;
+
+ /* Work out what, if anything, we want from this entry */
+ reqs = _tocEntryRequired(te, ropt, true);
+
+ if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ {
+ ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+ te->desc, te->tag);
+ _printTocEntry(AH, te, ropt, false, true);
+ }
+ }
+
+ /* clean up */
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+ ahlog(GAH,1,"is work being done?\n");
+ while(n_slots--)
+ {
+ if (slot->pid > 0)
+ return true;
+ slot++;
+ }
+ ahlog(GAH,1,"work is not being done\n");
+ return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+ int i;
+
+ for (i = 0; i < n_slots; i++)
+ {
+ if (slots[i].pid == 0)
+ {
+ ahlog(GAH,1,"available slots is %d\n",i);
+ return i;
+ }
+ }
+ ahlog(GAH,1,"No slot available\n");
+ return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+ TocEntry *te;
+ teReqs reqs;
+
+ /* just search from the top of the queue until we find an available item.
+ * Note that the queue isn't reordered in the current implementation. If
+ * we ever do reorder it, then certain code that processes entries from the
+ * current item to the end of the queue will probably need to be
+ * re-examined.
+ */
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (!te->prestored && te->depCount < 1)
+ {
+ /* make sure it's not an ACL */
+ reqs = _tocEntryRequired (te, AH->ropt, false);
+ if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+ {
+ ahlog(AH,1,"next item is %d\n",te->dumpId);
+ return te;
+ }
+ }
+ }
+ ahlog(AH,1,"No item ready\n");
+ return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+ RestoreOptions *ropt = AH->ropt;
+ int retval;
+
+ /* close and reopen the archive so we have a private copy that doesn't
+ * stomp on anyone else's file pointer
+ */
+
+ (AH->ReopenPtr)(AH);
+
+ ConnectDatabase((Archive *)AH, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->requirePassword);
+
+ /*
+ * Establish important parameter values right away.
+ */
+ _doSetFixedOutputState(AH);
+
+ retval = _restore_one_te(AH, te, ropt, true);
+
+ PQfinish(AH->connection);
+ exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+ ParallelSlot *slots, int n_slots)
+ {
+
+ TocEntry *te = NULL;
+ int i;
+
+ for (i = 0; i < n_slots; i++)
+ {
+ if (slots[i].pid == worker)
+ {
+ te = slots[i].te;
+ slots[i].pid = 0;
+ slots[i].te = NULL;
+ slots[i].dumpId = 0;
+ break;
+ }
+ }
+
+ /* Assert (te != NULL); */
+
+ _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+ * Make sure the head of each dependency chain is a live item
+ *
+ * Once this is established the property will be maintained by
+ * _reduce_dependencies called as items are done.
+ */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+ TocEntry * te;
+ RestoreOptions * ropt = AH->ropt;
+ bool * RealDumpIds;
+ int i;
+
+
+ RealDumpIds = calloc(AH->maxDumpId, sizeof(bool));
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ RealDumpIds[te->dumpId-1] = true;
+ if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+ _reduce_dependencies(AH,te);
+ }
+
+ /*
+ * It is possible that the dependencies list items that are
+ * not in the archive at all. Reduce the depcounts so those get
+ * ignored.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ for (i = 0; i < te->nDeps; i++)
+ if (!RealDumpIds[te->dependencies[i]-1])
+ te->depCount--;
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+ DumpId item = te->dumpId;
+ RestoreOptions * ropt = AH->ropt;
+ int i;
+
+ for (te = te->next; te != AH->toc; te = te->next)
+ {
+ if (te->nDeps == 0)
+ continue;
+
+ for (i = 0; i < te->nDeps; i++)
+ if (te->dependencies[i] == item)
+ te->depCount = te->depCount - 1;
+
+ /* If this is a table data item we are making available,
+ * make the table's dependencies depend on this item instead of
+ * the table definition, so they
+ * don't get scheduled until the data is loaded.
+ * Have to do this now before the main loop gets to anything
+ * further down the list.
+ */
+ if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0)
+ {
+ TocEntry *tes;
+ int j;
+ for (tes = te->next; tes != AH->toc; tes = tes->next)
+ for (j = 0; j < tes->nDeps; j++)
+ if (tes->dependencies[j] == item)
+ tes->dependencies[j] = te->dumpId;
+ }
+
+ /*
+ * If this item won't in fact be done, and is now at
+ * 0 dependency count, we pretend it's been done and
+ * reduce the dependency counts of all the things that
+ * depend on it, by a recursive call
+ */
+ if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+ _reduce_dependencies(AH,te);
+ }
+
+ }
+
+
+ /* Public */
+ void
RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
ArchiveHandle *AH = (ArchiveHandle *) AHX;
TocEntry *te;
teReqs reqs;
OutputContext sav;
AH->ropt = ropt;
AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 613,632 ----
AH->noTocComments = 1;
}
+ #ifndef HAVE_LIBZ
+
+ /* make sure we won't need (de)compression we haven't got */
+ if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ reqs = _tocEntryRequired(te, ropt, false);
+ if (te->hadDumper && (reqs & REQ_DATA) != 0)
+ die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
+ }
+ }
+ #endif
+
/*
* Work out if we have an implied data-only restore. This can happen if
* the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! AH->currentTE = te;
!
! /* Work out what, if anything, we want from this entry */
! reqs = _tocEntryRequired(te, ropt, false);
!
! /* Dump any relevant dump warnings to stderr */
! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
! {
! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
! write_msg(modulename, "warning from original dump file: %s\n", te->defn);
! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
! }
!
! defnDumped = false;
!
! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
! {
! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
! _printTocEntry(AH, te, ropt, false, false);
! defnDumped = true;
!
! /*
! * If we could not create a table and --no-data-for-failed-tables
! * was given, ignore the corresponding TABLE DATA
! */
! if (ropt->noDataForFailedTables &&
! AH->lastErrorTE == te &&
! strcmp(te->desc, "TABLE") == 0)
! {
! TocEntry *tes;
!
! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
! te->tag);
!
! for (tes = te->next; tes != AH->toc; tes = tes->next)
! {
! if (strcmp(tes->desc, "TABLE DATA") == 0 &&
! strcmp(tes->tag, te->tag) == 0 &&
! strcmp(tes->namespace ? tes->namespace : "",
! te->namespace ? te->namespace : "") == 0)
! {
! /* mark it unwanted */
! ropt->idWanted[tes->dumpId - 1] = false;
! break;
! }
! }
! }
!
! /* If we created a DB, connect to it... */
! if (strcmp(te->desc, "DATABASE") == 0)
! {
! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
! _reconnectToDB(AH, te->tag);
! }
! }
!
! /*
! * If we have a data component, then process it
! */
! if ((reqs & REQ_DATA) != 0)
! {
! /*
! * hadDumper will be set if there is genuine data component for
! * this node. Otherwise, we need to check the defn field for
! * statements that need to be executed in data-only restores.
! */
! if (te->hadDumper)
! {
! /*
! * If we can output the data, then restore it.
! */
! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
! {
! #ifndef HAVE_LIBZ
! if (AH->compression != 0)
! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
! #endif
!
! _printTocEntry(AH, te, ropt, true, false);
!
! if (strcmp(te->desc, "BLOBS") == 0 ||
! strcmp(te->desc, "BLOB COMMENTS") == 0)
! {
! ahlog(AH, 1, "restoring %s\n", te->desc);
!
! _selectOutputSchema(AH, "pg_catalog");
!
! (*AH->PrintTocDataPtr) (AH, te, ropt);
! }
! else
! {
! _disableTriggersIfNecessary(AH, te, ropt);
!
! /* Select owner and schema as necessary */
! _becomeOwner(AH, te);
! _selectOutputSchema(AH, te->namespace);
!
! ahlog(AH, 1, "restoring data for table \"%s\"\n",
! te->tag);
!
! /*
! * If we have a copy statement, use it. As of V1.3,
! * these are separate to allow easy import from
! * withing a database connection. Pre 1.3 archives can
! * not use DB connections and are sent to output only.
! *
! * For V1.3+, the table data MUST have a copy
! * statement so that we can go into appropriate mode
! * with libpq.
! */
! if (te->copyStmt && strlen(te->copyStmt) > 0)
! {
! ahprintf(AH, "%s", te->copyStmt);
! AH->writingCopyData = true;
! }
!
! (*AH->PrintTocDataPtr) (AH, te, ropt);
!
! AH->writingCopyData = false;
!
! _enableTriggersIfNecessary(AH, te, ropt);
! }
! }
! }
! else if (!defnDumped)
! {
! /* If we haven't already dumped the defn part, do so now */
! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
! _printTocEntry(AH, te, ropt, false, false);
! }
! }
! } /* end loop over TOC entries */
/*
* Scan TOC again to output ownership commands and ACLs
--- 726,733 ----
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! (void) _restore_one_te(AH, te, ropt, false);
! }
/*
* Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 775,955 ----
}
}
+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+ RestoreOptions *ropt, bool is_parallel)
+ {
+ teReqs reqs;
+ bool defnDumped;
+ int retval = 0;
+
+ AH->currentTE = te;
+
+ /* Work out what, if anything, we want from this entry */
+ reqs = _tocEntryRequired(te, ropt, false);
+
+ /* Dump any relevant dump warnings to stderr */
+ if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+ {
+ if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+ write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+ else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+ write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+ }
+
+ defnDumped = false;
+
+ if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ {
+ ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+ _printTocEntry(AH, te, ropt, false, false);
+ defnDumped = true;
+
+ /*
+ * If we could not create a table and --no-data-for-failed-tables
+ * was given, ignore the corresponding TABLE DATA
+ *
+ * For the parallel case this must be done in the parent, so we just
+ * set a return value.
+ */
+ if (ropt->noDataForFailedTables &&
+ AH->lastErrorTE == te &&
+ strcmp(te->desc, "TABLE") == 0)
+ {
+ if (is_parallel)
+ retval = 1;
+ else
+ _inhibit_data_for_failed_table(AH,te);
+ }
+
+ /* If we created a DB, connect to it... */
+ /* won't happen in parallel restore */
+ if (strcmp(te->desc, "DATABASE") == 0)
+ {
+ ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+ _reconnectToDB(AH, te->tag);
+ }
+ }
+
+ /*
+ * If we have a data component, then process it
+ */
+ if ((reqs & REQ_DATA) != 0)
+ {
+ /*
+ * hadDumper will be set if there is genuine data component for
+ * this node. Otherwise, we need to check the defn field for
+ * statements that need to be executed in data-only restores.
+ */
+ if (te->hadDumper)
+ {
+ /*
+ * If we can output the data, then restore it.
+ */
+ if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+ {
+ _printTocEntry(AH, te, ropt, true, false);
+
+ if (strcmp(te->desc, "BLOBS") == 0 ||
+ strcmp(te->desc, "BLOB COMMENTS") == 0)
+ {
+ ahlog(AH, 1, "restoring %s\n", te->desc);
+
+ _selectOutputSchema(AH, "pg_catalog");
+
+ (*AH->PrintTocDataPtr) (AH, te, ropt);
+ }
+ else
+ {
+ _disableTriggersIfNecessary(AH, te, ropt);
+
+ /* Select owner and schema as necessary */
+ _becomeOwner(AH, te);
+ _selectOutputSchema(AH, te->namespace);
+
+ ahlog(AH, 1, "restoring data for table \"%s\"\n",
+ te->tag);
+
+ if (ropt->truncate_before_load)
+ {
+ if (AH->connection)
+ StartTransaction(AH);
+ else
+ ahprintf(AH, "BEGIN;\n\n");
+
+ ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+ fmtId(te->tag)); }
+
+ /*
+ * If we have a copy statement, use it. As of V1.3,
+ * these are separate to allow easy import from
+ * withing a database connection. Pre 1.3 archives can
+ * not use DB connections and are sent to output only.
+ *
+ * For V1.3+, the table data MUST have a copy
+ * statement so that we can go into appropriate mode
+ * with libpq.
+ */
+ if (te->copyStmt && strlen(te->copyStmt) > 0)
+ {
+ ahprintf(AH, "%s", te->copyStmt);
+ AH->writingCopyData = true;
+ }
+
+ (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+ AH->writingCopyData = false;
+
+ if (ropt->truncate_before_load)
+ {
+ if (AH->connection)
+ CommitTransaction(AH);
+ else
+ ahprintf(AH, "COMMIT;\n\n");
+ }
+
+
+ _enableTriggersIfNecessary(AH, te, ropt);
+ }
+ }
+ }
+ else if (!defnDumped)
+ {
+ /* If we haven't already dumped the defn part, do so now */
+ ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+ _printTocEntry(AH, te, ropt, false, false);
+ }
+ }
+
+ return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+ TocEntry *tes;
+ RestoreOptions *ropt = AH->ropt;
+
+ ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+ te->tag);
+
+ for (tes = te->next; tes != AH->toc; tes = tes->next)
+ {
+ if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+ strcmp(tes->tag, te->tag) == 0 &&
+ strcmp(tes->namespace ? tes->namespace : "",
+ te->namespace ? te->namespace : "") == 0)
+ {
+ /* mark it unwanted */
+ ropt->idWanted[tes->dumpId - 1] = false;
+
+ _reduce_dependencies(AH, tes);
+ break;
+ }
+ }
+ }
+
/*
* Allocate a new RestoreOptions block.
* This is mainly so we can initialize it, but also for future expansion,
***************
*** 653,662 ****
while (te != AH->toc)
{
if (_tocEntryRequired(te, ropt, true) != 0)
! ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
te->catalogId.tableoid, te->catalogId.oid,
te->desc, te->namespace ? te->namespace : "-",
te->tag, te->owner);
te = te->next;
}
--- 1152,1167 ----
while (te != AH->toc)
{
if (_tocEntryRequired(te, ropt, true) != 0)
! {
! int i;
! ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps);
! for (i=0 ;i<te->nDeps; i++)
! ahprintf(AH, "%d ",te->dependencies[i]);
! ahprintf(AH, "] %u %u %s %s %s %s\n",
te->catalogId.tableoid, te->catalogId.oid,
te->desc, te->namespace ? te->namespace : "-",
te->tag, te->owner);
+ }
te = te->next;
}
***************
*** 1948,1965 ****
--- 2453,2473 ----
deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
te->dependencies = deps;
te->nDeps = depIdx;
+ te->depCount = depIdx;
}
else
{
free(deps);
te->dependencies = NULL;
te->nDeps = 0;
+ te->depCount = 0;
}
}
else
{
te->dependencies = NULL;
te->nDeps = 0;
+ te->depCount = 0;
}
if (AH->ReadExtraTocPtr)
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76
--- pg_backup_archiver.h 29 Sep 2008 02:43:52 -0000
***************
*** 99,104 ****
--- 99,105 ----
struct _restoreList;
typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */
ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */
ClosePtr ClosePtr; /* Close the archive */
+ ReopenPtr ReopenPtr; /* Reopen the archive */
WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data
* associated with the current archive
* format */
***************
*** 231,236 ****
--- 233,239 ----
char *archdbname; /* DB name *read* from archive */
bool requirePassword;
PGconn *connection;
+ char *cachepw;
int connectToDB; /* Flag to indicate if direct DB connection is
* required */
bool writingCopyData; /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
DumpId dumpId;
bool hadDumper; /* Archiver was passed a dumper routine (used
* in restore) */
+ bool prestored; /* keep track of parallel restore */
char *tag; /* index tag */
char *namespace; /* null or empty string if not in a schema */
char *tablespace; /* null if not in a tablespace; empty string
***************
*** 296,301 ****
--- 300,306 ----
char *copyStmt;
DumpId *dependencies; /* dumpIds of objects this one depends on */
int nDeps; /* number of dependencies */
+ int depCount; /* adjustable tally of dependencies */
DataDumperPtr dataDumper; /* Routine to dump data for object */
void *dataDumperArg; /* Arg for above routine */
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c 28 Oct 2007 21:55:52 -0000 1.40
--- pg_backup_custom.c 29 Sep 2008 02:43:52 -0000
***************
*** 40,45 ****
--- 40,46 ----
static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = _ReopenArchive;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
AH->FH = NULL;
}
+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+ pgoff_t tpos;
+
+ if (AH->mode == archModeWrite)
+ {
+ die_horribly(AH,modulename,"Can only reopen input archives");
+ }
+ else if ((! AH->fSpec) || strcmp(AH->fSpec, "") == 0)
+ {
+ die_horribly(AH,modulename,"Cannot reopen stdin");
+ }
+
+ tpos = ftello(AH->FH);
+
+ if (fclose(AH->FH) != 0)
+ die_horribly(AH, modulename, "could not close archive file: %s\n",
+ strerror(errno));
+
+ AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+ if (!AH->FH)
+ die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+ AH->fSpec, strerror(errno));
+
+ if (ctx->hasSeek)
+ {
+ fseeko(AH->FH, tpos, SEEK_SET);
+ }
+ else
+ {
+ die_horribly(AH,modulename,"cannot reopen non-seekable file");
+ }
+
+ }
+
/*--------------------------------------------------
* END OF FORMAT CALLBACKS
*--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80
--- pg_backup_db.c 29 Sep 2008 02:43:52 -0000
***************
*** 138,148 ****
ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);
! if (AH->requirePassword)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
}
do
--- 138,153 ----
ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);
! if (AH->requirePassword && AH->cachepw == NULL)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
+ AH->requirePassword = true;
+ }
+ else if (AH->requirePassword)
+ {
+ password = AH->cachepw;
}
do
***************
*** 174,180 ****
}
} while (new_pass);
! if (password)
free(password);
/* check for version mismatch */
--- 179,185 ----
}
} while (new_pass);
! if (password != AH->cachepw)
free(password);
/* check for version mismatch */
***************
*** 206,220 ****
if (AH->connection)
die_horribly(AH, modulename, "already connected to a database\n");
! if (reqPwd)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
AH->requirePassword = true;
}
else
AH->requirePassword = false;
/*
* Start the connection. Loop until we have a password if requested by
--- 211,231 ----
if (AH->connection)
die_horribly(AH, modulename, "already connected to a database\n");
! if (reqPwd && AH->cachepw == NULL)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
AH->requirePassword = true;
}
+ else if (reqPwd)
+ {
+ password = AH->cachepw;
+ }
else
+ {
AH->requirePassword = false;
+ }
/*
* Start the connection. Loop until we have a password if requested by
***************
*** 241,247 ****
} while (new_pass);
if (password)
! free(password);
/* check to see that the backend connection was successfully made */
if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 252,258 ----
} while (new_pass);
if (password)
! AH->cachepw = password;
/* check to see that the backend connection was successfully made */
if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c 28 Oct 2007 21:55:52 -0000 1.34
--- pg_backup_files.c 29 Sep 2008 02:43:52 -0000
***************
*** 87,92 ****
--- 87,93 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = NULL;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c 15 Nov 2007 21:14:41 -0000 1.62
--- pg_backup_tar.c 29 Sep 2008 02:43:52 -0000
***************
*** 143,148 ****
--- 143,149 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = NULL;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88
--- pg_restore.c 29 Sep 2008 02:43:52 -0000
***************
*** 78,83 ****
--- 78,84 ----
static int no_data_for_failed_tables = 0;
static int outputNoTablespaces = 0;
static int use_setsessauth = 0;
+ static int truncate_before_load = 0;
struct option cmdopts[] = {
{"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
{"ignore-version", 0, NULL, 'i'},
{"index", 1, NULL, 'I'},
{"list", 0, NULL, 'l'},
+ {"multi-thread",1,NULL,'m'},
{"no-privileges", 0, NULL, 'x'},
{"no-acl", 0, NULL, 'x'},
{"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
{"disable-triggers", no_argument, &disable_triggers, 1},
{"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
{"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+ {"truncate-before-load", no_argument, &truncate_before_load, 1},
{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
{NULL, 0, NULL, 0}
***************
*** 139,145 ****
}
}
! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
cmdopts, NULL)) != -1)
{
switch (c)
--- 142,148 ----
}
}
! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
cmdopts, NULL)) != -1)
{
switch (c)
***************
*** 182,187 ****
--- 185,194 ----
opts->tocFile = strdup(optarg);
break;
+ case 'm':
+ opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+ break;
+
case 'n': /* Dump data for this schema only */
opts->schemaNames = strdup(optarg);
break;
***************
*** 262,268 ****
break;
case 0:
! /* This covers the long options equivalent to -X xxx. */
break;
case '1': /* Restore data in a single transaction */
--- 269,278 ----
break;
case 0:
! /*
! * This covers the long options without a short equivalent,
! * including those equivalent to -X xxx.
! */
break;
case '1': /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
opts->noDataForFailedTables = no_data_for_failed_tables;
opts->noTablespace = outputNoTablespaces;
opts->use_setsessauth = use_setsessauth;
+ opts->truncate_before_load = truncate_before_load;
+
+ if (opts->single_txn)
+ {
+ if (opts->number_of_threads > 1)
+ {
+ write_msg(NULL, "single transaction not compatible with multi-threading");
+ exit(1);
+ }
+ else if (opts->truncate_before_load)
+ {
+ write_msg(NULL, "single transaction not compatible with truncate-before-load");
+ exit(1);
+ }
+ }
if (opts->formatName)
{
***************
*** 330,335 ****
--- 355,362 ----
AH = OpenArchive(inputFileSpec, opts->format);
+ /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
/* Let the archiver know how noisy to be */
AH->verbose = opts->verbose;
***************
*** 351,356 ****
--- 378,385 ----
if (opts->tocSummary)
PrintTOCSummary(AH, opts);
+ else if (opts->number_of_threads > 1)
+ RestoreArchiveParallel(AH, opts);
else
RestoreArchive(AH, opts);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers