Rebased and attached new patch. Should I add it to July's commitfest?


On Fri, 4 Apr 2025, Dimitrios Apostolou wrote:

Hello list,

based on the delays I experienced in pg_restore, as described at:

https://www.postgresql.org/message-id/flat/6bd16bdb-aa5e-0512-739d-b84100596...@gmx.net

I noticed that the seeking-reading behaviour was manifested by every one of the pg_restore worker processes, in parallel, making the situation even worse. With this patch I moved this phase to the parent process before fork(), so that the children have the necessary information from birth.

Copying the commit message:

A pg_dump custom format archive without offsets in the table of
contents, is usually generated when pg_dump writes to stdout instead of
a file. When doing parallel pg_restore (-j) from such a file, every
worker process was scanning the full archive sequentially, in order to
build the offset table and find the parts assigned to restore. This led
to the worker processes competing for I/O.

This patch moves this offset-table building phase to the parent process,
before forking the worker processes.

The upside is that we now have only one extra scan of the file.
And this scan happens without other competing I/O, so it completes
faster.

The downside is that there is a delay before spawning the children and
starting assigning jobs to them.


What do you think?

Thanks,
Dimitris

From 8eced02eacf62aee5766b0f81b5eb500da0e6140 Mon Sep 17 00:00:00 2001
From: Dimitrios Apostolou <ji...@qt.io>
Date: Sat, 29 Mar 2025 01:16:19 +0100
Subject: [PATCH v2] parallel pg_restore: move offset-building phase to before
 forking

A pg_dump custom format archive without offsets in the table of contents
is usually generated when it writes to stdout instead of a file.

When doing parallel pg_restore (-j) from such a file, every worker
process was scanning the full archive sequentially, in order to build
the offset table and find the parts assigned to restore.

This patch moves this offset-table building phase to the parent process,
before forking the worker processes.

The upside is that we now have only one extra scan of the file.
And this scan happens without other competing I/O, so it completes
faster.

The downside is that there is a delay before spawning the children and
starting assigning jobs to them.
---
 src/bin/pg_dump/pg_backup_custom.c | 91 ++++++++++++++++++++++++++++--
 1 file changed, 85 insertions(+), 6 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 27695e24dde..b65cf56d564 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -815,64 +815,143 @@ _ReopenArchive(ArchiveHandle *AH)
 	AH->FH = fopen(AH->fSpec, PG_BINARY_R);
 	if (!AH->FH)
 		pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
 
 	if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
 		pg_fatal("could not set seek position in archive file: %m");
 }
 
+/*
+ * Read through (skip) one full data entry from the archive.
+ * Return the size, and save the dumpId of the scanned entry.
+ */
+static size_t
+_ReadOneFullEntry(ArchiveHandle *AH, pgoff_t curPos, DumpId *scanned_id)
+{
+	int			blkType;
+
+	_readBlockHeader(AH, &blkType, scanned_id);
+
+	if (blkType == EOF)
+		return 0;
+
+	switch (blkType)
+	{
+		case BLK_DATA:
+			_skipData(AH);
+			break;
+
+		case BLK_BLOBS:
+			_skipLOs(AH);
+			break;
+
+		default:				/* Always have a default */
+			pg_fatal("unrecognized data block type (%d) while searching archive",
+					 blkType);
+			break;
+	}
+
+	return ftello(AH->FH) - curPos;
+}
+
 /*
  * Prepare for parallel restore.
  *
  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
  * TOC entries' dataLength fields with appropriate values to guide the
  * ordering of restore jobs.  The source of said data is format-dependent,
  * as is the exact meaning of the values.
  *
  * A format module might also choose to do other setup here.
  */
 static void
 _PrepParallelRestore(ArchiveHandle *AH)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
+	TocEntry   *te;
 	TocEntry   *prev_te = NULL;
 	lclTocEntry *prev_tctx = NULL;
-	TocEntry   *te;
+	const int	hasSeek = ctx->hasSeek;
+	pgoff_t		curPos = ctx->lastFilePos;
+	bool		first_data_entry = true;
+
+	if (hasSeek && (
+					fseeko(AH->FH, curPos, SEEK_SET) != 0))
+		pg_fatal("error during file seek: %m");
 
 	/*
 	 * Knowing that the data items were dumped out in TOC order, we can
 	 * reconstruct the length of each item as the delta to the start offset of
 	 * the next data item.
 	 */
 	for (te = AH->toc->next; te != AH->toc; te = te->next)
 	{
 		lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 
+		if (tctx->dataState == K_OFFSET_POS_SET)
+		{
+			if (first_data_entry && tctx->dataPos != curPos)
+				pg_fatal("data for first archive entry %d is recorded to"
+						 " start at %ld but it really starts at %ld",
+						 te->dumpId, tctx->dataPos, curPos);
+		}
+
+		/*
+		 * If the archive's TOC is missing the positions of each entry, then
+		 * most likely the archive was written to a pipe. If we are not
+		 * reading from a pipe now, we can do one extra scan to find the
+		 * missing positions. They are needed in parallel restore in order to
+		 * assign TOC entries as jobs to the workers.
+		 */
+		else if (tctx->dataState == K_OFFSET_POS_NOT_SET && hasSeek)
+		{
+			DumpId		dump_id;
+			size_t		entry_size;
+
+			if (first_data_entry)
+			{
+				pg_log_info("archive does not contain offsets;"
+							" doing one full scan to calculate them...");
+			}
+			/* Fill in the missing info */
+			tctx->dataPos = curPos;
+			tctx->dataState = K_OFFSET_POS_SET;
+			/* Advance */
+			entry_size = _ReadOneFullEntry(AH, curPos, &dump_id);
+			if (dump_id != te->dumpId)
+				pg_fatal("found unexpected dump ID %d -- expected %d",
+						 dump_id, te->dumpId);
+			curPos += entry_size;
+		}
+
 		/*
-		 * Ignore entries without a known data offset; if we were unable to
-		 * seek to rewrite the TOC when creating the archive, this'll be all
-		 * of them, and we'll end up with no size estimates.
+		 * When we can't seek, ignore data entries without a known data
+		 * offset. This shouldn't happen in parallel restore though, the
+		 * archive should always be seekable. Also skip K_OFFSET_NO_DATA
+		 * entries since they are not present in the data section of the
+		 * archive.
 		 */
-		if (tctx->dataState != K_OFFSET_POS_SET)
+		else
 			continue;
 
 		/* Compute previous data item's length */
 		if (prev_te)
 		{
 			if (tctx->dataPos > prev_tctx->dataPos)
 				prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
 		}
 
 		prev_te = te;
 		prev_tctx = tctx;
+		first_data_entry = false;
 	}
 
 	/* If OK to seek, we can determine the length of the last item */
-	if (prev_te && ctx->hasSeek)
+	if (prev_te && hasSeek)
 	{
 		pgoff_t		endpos;
 
 		if (fseeko(AH->FH, 0, SEEK_END) != 0)
 			pg_fatal("error during file seek: %m");
 		endpos = ftello(AH->FH);
 		if (endpos > prev_tctx->dataPos)
 			prev_te->dataLength = endpos - prev_tctx->dataPos;
-- 
2.49.0

Reply via email to