Hello list, based on the delays I experienced in pg_restore, as described at:
https://www.postgresql.org/message-id/flat/6bd16bdb-aa5e-0512-739d-b84100596...@gmx.net I noticed that the seeking-reading behaviour was manifested by every one of the pg_restore worker processes, in parallel, making the situation even worse. With this patch I moved this phase to the parent process before fork(), so that the children have the necessary information from birth. Copying the commit message: A pg_dump custom format archive without offsets in the table of contents, is usually generated when pg_dump writes to stdout instead of a file. When doing parallel pg_restore (-j) from such a file, every worker process was scanning the full archive sequentially, in order to build the offset table and find the parts assigned to restore. This led to the worker processes competing for I/O. This patch moves this offset-table building phase to the parent process, before forking the worker processes. The upside is that we now have only one extra scan of the file. And this scan happens without other competing I/O, so it completes faster. The downside is that there is a delay before spawning the children and starting assigning jobs to them. What do you think? Thanks, Dimitris
From 8aaf72bb31fcb55ed57893e64b5cbb14e23d14c7 Mon Sep 17 00:00:00 2001 From: Dimitrios Apostolou <ji...@qt.io> Date: Sat, 29 Mar 2025 01:16:19 +0100 Subject: [PATCH v1] parallel pg_restore: move offset-building phase to before forking A pg_dump custom format archive without offsets in the table of contents, is usually generated when pg_dump writes to stdout instead of a file. When doing parallel pg_restore (-j) from such a file, every worker process was scanning the full archive sequentially, in order to build the offset table and find the parts assigned to restore. This led to the worker processes competing for I/O. This patch moves this offset-table building phase to the parent process, before forking the worker processes. The upside is that we now have only one extra scan of the file. And this scan happens without other competing I/O, so it completes faster. The downside is that there is a delay before spawning the children and starting assigning jobs to them. --- src/bin/pg_dump/pg_backup_custom.c | 91 ++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 6 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 6ed5f9d90e9..3bf17c104d3 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -812,66 +812,145 @@ _ReopenArchive(ArchiveHandle *AH) AH->FH = fopen(AH->fSpec, PG_BINARY_R); if (!AH->FH) pg_fatal("could not open input file \"%s\": %m", AH->fSpec); if (fseeko(AH->FH, tpos, SEEK_SET) != 0) pg_fatal("could not set seek position in archive file: %m"); } +/* + * Read through (skip) one full data entry from the archive. + * Return the size, and save the dumpId of the scanned entry. + */ +static size_t +_ReadOneFullEntry(ArchiveHandle *AH, pgoff_t curPos, DumpId *scanned_id) +{ + int blkType; + + _readBlockHeader(AH, &blkType, scanned_id); + + if (blkType == EOF) + return 0; + + switch (blkType) + { + case BLK_DATA: + _skipData(AH); + break; + + case BLK_BLOBS: + _skipLOs(AH); + break; + + default: /* Always have a default */ + pg_fatal("unrecognized data block type (%d) while searching archive", + blkType); + break; + } + + return ftello(AH->FH) - curPos; +} + /* * Prepare for parallel restore. * * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS * TOC entries' dataLength fields with appropriate values to guide the * ordering of restore jobs. The source of said data is format-dependent, * as is the exact meaning of the values. * * A format module might also choose to do other setup here. */ static void _PrepParallelRestore(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; + TocEntry *te; TocEntry *prev_te = NULL; lclTocEntry *prev_tctx = NULL; - TocEntry *te; + const int hasSeek = ctx->hasSeek; + pgoff_t curPos = ctx->lastFilePos; + bool first_data_entry = true; + + if (hasSeek && ( + fseeko(AH->FH, curPos, SEEK_SET) != 0)) + pg_fatal("error during file seek: %m"); /* * Knowing that the data items were dumped out in TOC order, we can * reconstruct the length of each item as the delta to the start offset of * the next data item. */ for (te = AH->toc->next; te != AH->toc; te = te->next) { lclTocEntry *tctx = (lclTocEntry *) te->formatData; + if (tctx->dataState == K_OFFSET_POS_SET) + { + if (first_data_entry && tctx->dataPos != curPos) + pg_fatal("data for first archive entry %d is recorded to" + " start at %ld but it really starts at %ld", + te->dumpId, tctx->dataPos, curPos); + } + + /* + * If the archive's TOC is missing the positions of each entry, then + * most likely the archive was written to a pipe. If we are not + * reading from a pipe now, we can do one extra scan to find the + * missing positions. They are needed in parallel restore in order to + * assign TOC entries as jobs to the workers. + */ + else if (tctx->dataState == K_OFFSET_POS_NOT_SET && hasSeek) + { + DumpId dump_id; + size_t entry_size; + + if (first_data_entry) + { + pg_log_info("archive does not contain offsets;" + " doing one full scan to calculate them..."); + } + /* Fill in the missing info */ + tctx->dataPos = curPos; + tctx->dataState = K_OFFSET_POS_SET; + /* Advance */ + entry_size = _ReadOneFullEntry(AH, curPos, &dump_id); + if (dump_id != te->dumpId) + pg_fatal("found unexpected dump ID %d -- expected %d", + dump_id, te->dumpId); + curPos += entry_size; + } + /* - * Ignore entries without a known data offset; if we were unable to - * seek to rewrite the TOC when creating the archive, this'll be all - * of them, and we'll end up with no size estimates. + * When we can't seek, ignore data entries without a known data + * offset. This shouldn't happen in parallel restore though, the + * archive should always be seekable. Also skip K_OFFSET_NO_DATA + * entries since they are not present in the data section of the + * archive. */ - if (tctx->dataState != K_OFFSET_POS_SET) + else continue; /* Compute previous data item's length */ if (prev_te) { if (tctx->dataPos > prev_tctx->dataPos) prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos; } prev_te = te; prev_tctx = tctx; + first_data_entry = false; } /* If OK to seek, we can determine the length of the last item */ - if (prev_te && ctx->hasSeek) + if (prev_te && hasSeek) { pgoff_t endpos; if (fseeko(AH->FH, 0, SEEK_END) != 0) pg_fatal("error during file seek: %m"); endpos = ftello(AH->FH); if (endpos > prev_tctx->dataPos) prev_te->dataLength = endpos - prev_tctx->dataPos; } -- 2.49.0