On 3/21/21 3:56 PM, Tom Lane wrote:
Jan Wieck <j...@wi3ck.info> writes:
So let's focus on the actual problem of running out of XIDs and memory while doing the upgrade involving millions of small large objects.

Right.  So as far as --single-transaction vs. --create goes, that's
mostly a definitional problem.  As long as the contents of a DB are
restored in one transaction, it's not gonna matter if we eat one or
two more XIDs while creating the DB itself.  So we could either
relax pg_restore's complaint, or invent a different switch that's
named to acknowledge that it's not really only one transaction.

That still leaves us with the lots-o-locks problem.  However, once
we've crossed the Rubicon of "it's not really only one transaction",
you could imagine that the switch is "--fewer-transactions", and the
idea is for pg_restore to commit after every (say) 100000 operations.
That would both bound its lock requirements and greatly cut its XID
consumption.

It leaves us with three things.

1) tremendous amounts of locks
2) tremendous amounts of memory needed
3) taking forever because it is single threaded.

I created a pathological case here on a VM with 24GB of RAM, 80GB of SWAP sitting on NVME. The database has 20 million large objects, each of which has 2 GRANTS, 1 COMMENT and 1 SECURITY LABEL (dummy). Each LO only contains a string "large object <oid>", so the whole database in 9.5 is about 15GB in size.

A stock pg_upgrade to version 14devel using --link takes about 15 hours. This is partly because the pg_dump and pg_restore both grow to something like 50GB+ to hold the TOC. Which sounds out of touch considering that the entire system catalog on disk is less than 15GB. But aside from the ridiculous amount of swapping, the whole thing also suffers from consuming about 80 million transactions and apparently having just as many network round trips with a single client.


The work you described sounded like it could fit into that paradigm,
with the additional ability to run some parallel restore tasks
that are each consuming a bounded number of locks.

I have attached a POC patch that implements two new options for pg_upgrade.

  --restore-jobs=NUM             --jobs parameter passed to pg_restore
  --restore-blob-batch-size=NUM  number of blobs restored in one xact

It does a bit more than just that. It rearranges the way large objects are dumped so that most of the commands are all in one TOC entry and the entry is emitted into SECTION_DATA when in binary upgrade mode (which guarantees that there isn't any actual BLOB data in the dump). This greatly reduces the number of network round trips and when using 8 parallel restore jobs, almost saturates the 4-core VM. Reducing the number of TOC entries also reduces the total virtual memory need of pg_restore to 15G, so there is a lot less swapping going on.

It cuts down the pg_upgrade time from 15 hours to 1.5 hours. In that run I used --restore-jobs=8 and --restore-blob-batch-size=10000 (with a max_locks_per_transaction=12000).

As said, this isn't a "one size fits all" solution. The pg_upgrade parameters for --jobs and --restore-jobs will really depend on the situation. Hundreds of small databases want --jobs, but one database with millions of large objects wants --restore-jobs.


Regards, Jan

--
Jan Wieck
Principle Database Engineer
Amazon Web Services
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index c7351a4..4a611d0 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -864,6 +864,11 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
 	WaitForCommands(AH, pipefd);
 
 	/*
+	 * Close an eventually open BLOB batch transaction.
+	 */
+	CommitBlobTransaction((Archive *)AH);
+
+	/*
 	 * Disconnect from database and clean up.
 	 */
 	set_cancel_slot_archive(slot, NULL);
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 0296b9b..cd8a590 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -203,6 +203,8 @@ typedef struct Archive
 	int			numWorkers;		/* number of parallel processes */
 	char	   *sync_snapshot_id;	/* sync snapshot id for parallel operation */
 
+	int			blobBatchSize;	/* # of blobs to restore per transaction */
+
 	/* info needed for string escaping */
 	int			encoding;		/* libpq code for client_encoding */
 	bool		std_strings;	/* standard_conforming_strings */
@@ -269,6 +271,7 @@ extern void WriteData(Archive *AH, const void *data, size_t dLen);
 extern int	StartBlob(Archive *AH, Oid oid);
 extern int	EndBlob(Archive *AH, Oid oid);
 
+extern void	CommitBlobTransaction(Archive *AH);
 extern void CloseArchive(Archive *AH);
 
 extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 1f82c64..51a862a 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -68,6 +68,8 @@ typedef struct _parallelReadyList
 	bool		sorted;			/* are valid entries currently sorted? */
 } ParallelReadyList;
 
+static int		blobBatchCount = 0;
+static bool		blobInXact = false;
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const int compression, bool dosync, ArchiveMode mode,
@@ -265,6 +267,8 @@ CloseArchive(Archive *AHX)
 	int			res = 0;
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
 
+	CommitBlobTransaction(AHX);
+
 	AH->ClosePtr(AH);
 
 	/* Close the output */
@@ -279,6 +283,24 @@ CloseArchive(Archive *AHX)
 
 /* Public */
 void
+CommitBlobTransaction(Archive *AHX)
+{
+	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+	if (blobInXact)
+	{
+		ahprintf(AH, "--\n");
+		ahprintf(AH, "-- End BLOB restore batch\n");
+		ahprintf(AH, "--\n");
+		ahprintf(AH, "COMMIT;\n\n");
+
+		blobBatchCount = 0;
+		blobInXact = false;
+	}
+}
+
+/* Public */
+void
 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
 {
 	/* Caller can omit dump options, in which case we synthesize them */
@@ -3531,6 +3553,59 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
+	/* We restore BLOBs in batches to reduce XID consumption */
+	if (strcmp(te->desc, "BLOB") == 0 && AH->public.blobBatchSize > 0)
+	{
+		if (blobInXact)
+		{
+			/* We are inside a BLOB restore transaction */
+			if (blobBatchCount >= AH->public.blobBatchSize)
+			{
+				/*
+				 * We did reach the batch size with the previous BLOB.
+				 * Commit and start a new batch.
+				 */
+				ahprintf(AH, "--\n");
+				ahprintf(AH, "-- BLOB batch size reached\n");
+				ahprintf(AH, "--\n");
+				ahprintf(AH, "COMMIT;\n");
+				ahprintf(AH, "BEGIN;\n\n");
+
+				blobBatchCount = 1;
+			}
+			else
+			{
+				/* This one still fits into the current batch */
+				blobBatchCount++;
+			}
+		}
+		else
+		{
+			/* Not inside a transaction, start a new batch */
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "-- Start BLOB restore batch\n");
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "BEGIN;\n\n");
+
+			blobBatchCount = 1;
+			blobInXact = true;
+		}
+	}
+	else
+	{
+		/* Not a BLOB. If we have a BLOB batch open, close it. */
+		if (blobInXact)
+		{
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "-- End BLOB restore batch\n");
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "COMMIT;\n\n");
+
+			blobBatchCount = 0;
+			blobInXact = false;
+		}
+	}
+
 	/* Select owner, schema, tablespace and default AM as necessary */
 	_becomeOwner(AH, te);
 	_selectOutputSchema(AH, te->namespace);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f8bec3f..f153f08 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -165,12 +165,20 @@ static void guessConstraintInheritance(TableInfo *tblinfo, int numTables);
 static void dumpComment(Archive *fout, const char *type, const char *name,
 						const char *namespace, const char *owner,
 						CatalogId catalogId, int subid, DumpId dumpId);
+static bool dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+							 const char *type, const char *name,
+							 const char *namespace, const char *owner,
+							 CatalogId catalogId, int subid, DumpId dumpId);
 static int	findComments(Archive *fout, Oid classoid, Oid objoid,
 						 CommentItem **items);
 static int	collectComments(Archive *fout, CommentItem **items);
 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
 						 const char *namespace, const char *owner,
 						 CatalogId catalogId, int subid, DumpId dumpId);
+static bool dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+							  const char *type, const char *name,
+							  const char *namespace, const char *owner,
+							  CatalogId catalogId, int subid, DumpId dumpId);
 static int	findSecLabels(Archive *fout, Oid classoid, Oid objoid,
 						  SecLabelItem **items);
 static int	collectSecLabels(Archive *fout, SecLabelItem **items);
@@ -227,6 +235,13 @@ static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 					  const char *nspname, const char *owner,
 					  const char *acls, const char *racls,
 					  const char *initacls, const char *initracls);
+static bool dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+						 DumpId objDumpId, DumpId altDumpId,
+						 const char *type, const char *name,
+						 const char *subname,
+						 const char *nspname, const char *owner,
+						 const char *acls, const char *racls,
+						 const char *initacls, const char *initracls);
 
 static void getDependencies(Archive *fout);
 static void BuildArchiveDependencies(Archive *fout);
@@ -3468,11 +3483,44 @@ dumpBlob(Archive *fout, const BlobInfo *binfo)
 {
 	PQExpBuffer cquery = createPQExpBuffer();
 	PQExpBuffer dquery = createPQExpBuffer();
+	PQExpBuffer tag    = createPQExpBuffer();
+	teSection	section = SECTION_PRE_DATA;
 
 	appendPQExpBuffer(cquery,
 					  "SELECT pg_catalog.lo_create('%s');\n",
 					  binfo->dobj.name);
 
+	/*
+	 * In binary upgrade mode we put all the queries to restore
+	 * one large object into a single TOC entry and emit it as
+	 * SECTION_DATA so that they can be restored in parallel.
+	 */
+	if (fout->dopt->binary_upgrade)
+	{
+		section = SECTION_DATA;
+
+		/* Dump comment if any */
+		if (binfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+			dumpCommentQuery(fout, cquery, tag, "LARGE OBJECT",
+							 binfo->dobj.name, NULL, binfo->rolname,
+							 binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+		/* Dump security label if any */
+		if (binfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+			dumpSecLabelQuery(fout, cquery, tag, "LARGE OBJECT",
+							  binfo->dobj.name,
+							  NULL, binfo->rolname,
+							  binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+		/* Dump ACL if any */
+		if (binfo->blobacl && (binfo->dobj.dump & DUMP_COMPONENT_ACL))
+			dumpACLQuery(fout, cquery, tag,
+						 binfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
+						 binfo->dobj.name, NULL,
+						 NULL, binfo->rolname, binfo->blobacl, binfo->rblobacl,
+						 binfo->initblobacl, binfo->initrblobacl);
+	}
+
 	appendPQExpBuffer(dquery,
 					  "SELECT pg_catalog.lo_unlink('%s');\n",
 					  binfo->dobj.name);
@@ -3482,28 +3530,31 @@ dumpBlob(Archive *fout, const BlobInfo *binfo)
 					 ARCHIVE_OPTS(.tag = binfo->dobj.name,
 								  .owner = binfo->rolname,
 								  .description = "BLOB",
-								  .section = SECTION_PRE_DATA,
+								  .section = section,
 								  .createStmt = cquery->data,
 								  .dropStmt = dquery->data));
 
-	/* Dump comment if any */
-	if (binfo->dobj.dump & DUMP_COMPONENT_COMMENT)
-		dumpComment(fout, "LARGE OBJECT", binfo->dobj.name,
-					NULL, binfo->rolname,
-					binfo->dobj.catId, 0, binfo->dobj.dumpId);
-
-	/* Dump security label if any */
-	if (binfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
-		dumpSecLabel(fout, "LARGE OBJECT", binfo->dobj.name,
-					 NULL, binfo->rolname,
-					 binfo->dobj.catId, 0, binfo->dobj.dumpId);
-
-	/* Dump ACL if any */
-	if (binfo->blobacl && (binfo->dobj.dump & DUMP_COMPONENT_ACL))
-		dumpACL(fout, binfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
-				binfo->dobj.name, NULL,
-				NULL, binfo->rolname, binfo->blobacl, binfo->rblobacl,
-				binfo->initblobacl, binfo->initrblobacl);
+	if (!fout->dopt->binary_upgrade)
+	{
+		/* Dump comment if any */
+		if (binfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+			dumpComment(fout, "LARGE OBJECT", binfo->dobj.name,
+						NULL, binfo->rolname,
+						binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+		/* Dump security label if any */
+		if (binfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+			dumpSecLabel(fout, "LARGE OBJECT", binfo->dobj.name,
+						 NULL, binfo->rolname,
+						 binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+		/* Dump ACL if any */
+		if (binfo->blobacl && (binfo->dobj.dump & DUMP_COMPONENT_ACL))
+			dumpACL(fout, binfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
+					binfo->dobj.name, NULL,
+					NULL, binfo->rolname, binfo->blobacl, binfo->rblobacl,
+					binfo->initblobacl, binfo->initrblobacl);
+	}
 
 	destroyPQExpBuffer(cquery);
 	destroyPQExpBuffer(dquery);
@@ -9868,25 +9919,56 @@ dumpComment(Archive *fout, const char *type, const char *name,
 			const char *namespace, const char *owner,
 			CatalogId catalogId, int subid, DumpId dumpId)
 {
+	PQExpBuffer query = createPQExpBuffer();
+	PQExpBuffer tag = createPQExpBuffer();
+
+	if (dumpCommentQuery(fout, query, tag, type, name, namespace, owner,
+						 catalogId, subid, dumpId))
+	{
+		/*
+		 * We mark comments as SECTION_NONE because they really belong in the
+		 * same section as their parent, whether that is pre-data or
+		 * post-data.
+		 */
+		ArchiveEntry(fout, nilCatalogId, createDumpId(),
+					 ARCHIVE_OPTS(.tag = tag->data,
+								  .namespace = namespace,
+								  .owner = owner,
+								  .description = "COMMENT",
+								  .section = SECTION_NONE,
+								  .createStmt = query->data,
+								  .deps = &dumpId,
+								  .nDeps = 1));
+	}
+	destroyPQExpBuffer(query);
+	destroyPQExpBuffer(tag);
+}
+
+static bool
+dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+				 const char *type, const char *name,
+				 const char *namespace, const char *owner,
+				 CatalogId catalogId, int subid, DumpId dumpId)
+{
 	DumpOptions *dopt = fout->dopt;
 	CommentItem *comments;
 	int			ncomments;
 
 	/* do nothing, if --no-comments is supplied */
 	if (dopt->no_comments)
-		return;
+		return false;
 
 	/* Comments are schema not data ... except blob comments are data */
 	if (strcmp(type, "LARGE OBJECT") != 0)
 	{
 		if (dopt->dataOnly)
-			return;
+			return false;
 	}
 	else
 	{
 		/* We do dump blob comments in binary-upgrade mode */
 		if (dopt->schemaOnly && !dopt->binary_upgrade)
-			return;
+			return false;
 	}
 
 	/* Search for comments associated with catalogId, using table */
@@ -9905,9 +9987,6 @@ dumpComment(Archive *fout, const char *type, const char *name,
 	/* If a comment exists, build COMMENT ON statement */
 	if (ncomments > 0)
 	{
-		PQExpBuffer query = createPQExpBuffer();
-		PQExpBuffer tag = createPQExpBuffer();
-
 		appendPQExpBuffer(query, "COMMENT ON %s ", type);
 		if (namespace && *namespace)
 			appendPQExpBuffer(query, "%s.", fmtId(namespace));
@@ -9917,24 +9996,10 @@ dumpComment(Archive *fout, const char *type, const char *name,
 
 		appendPQExpBuffer(tag, "%s %s", type, name);
 
-		/*
-		 * We mark comments as SECTION_NONE because they really belong in the
-		 * same section as their parent, whether that is pre-data or
-		 * post-data.
-		 */
-		ArchiveEntry(fout, nilCatalogId, createDumpId(),
-					 ARCHIVE_OPTS(.tag = tag->data,
-								  .namespace = namespace,
-								  .owner = owner,
-								  .description = "COMMENT",
-								  .section = SECTION_NONE,
-								  .createStmt = query->data,
-								  .deps = &dumpId,
-								  .nDeps = 1));
-
-		destroyPQExpBuffer(query);
-		destroyPQExpBuffer(tag);
+		return true;
 	}
+
+	return false;
 }
 
 /*
@@ -15070,18 +15135,63 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 		const char *initacls, const char *initracls)
 {
 	DumpId		aclDumpId = InvalidDumpId;
+	PQExpBuffer query = createPQExpBuffer();
+	PQExpBuffer tag = createPQExpBuffer();
+
+	if (dumpACLQuery(fout, query, tag, objDumpId, altDumpId,
+					 type, name, subname, nspname, owner,
+					 acls, racls, initacls, initracls))
+	{
+		DumpId		aclDeps[2];
+		int			nDeps = 0;
+
+		if (subname)
+			appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
+		else
+			appendPQExpBuffer(tag, "%s %s", type, name);
+
+		aclDeps[nDeps++] = objDumpId;
+		if (altDumpId != InvalidDumpId)
+			aclDeps[nDeps++] = altDumpId;
+
+		aclDumpId = createDumpId();
+
+		ArchiveEntry(fout, nilCatalogId, aclDumpId,
+					 ARCHIVE_OPTS(.tag = tag->data,
+								  .namespace = nspname,
+								  .owner = owner,
+								  .description = "ACL",
+								  .section = SECTION_NONE,
+								  .createStmt = query->data,
+								  .deps = aclDeps,
+								  .nDeps = nDeps));
+
+	}
+
+	destroyPQExpBuffer(query);
+	destroyPQExpBuffer(tag);
+
+	return aclDumpId;
+}
+
+static bool
+dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+			 DumpId objDumpId, DumpId altDumpId,
+			 const char *type, const char *name, const char *subname,
+			 const char *nspname, const char *owner,
+			 const char *acls, const char *racls,
+			 const char *initacls, const char *initracls)
+{
 	DumpOptions *dopt = fout->dopt;
-	PQExpBuffer sql;
+	bool		haveACL = false;
 
 	/* Do nothing if ACL dump is not enabled */
 	if (dopt->aclsSkip)
-		return InvalidDumpId;
+		return false;
 
 	/* --data-only skips ACLs *except* BLOB ACLs */
 	if (dopt->dataOnly && strcmp(type, "LARGE OBJECT") != 0)
-		return InvalidDumpId;
-
-	sql = createPQExpBuffer();
+		return false;
 
 	/*
 	 * Check to see if this object has had any initial ACLs included for it.
@@ -15093,54 +15203,31 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 	 */
 	if (strlen(initacls) != 0 || strlen(initracls) != 0)
 	{
-		appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
+		haveACL = true;
+		appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
 		if (!buildACLCommands(name, subname, nspname, type,
 							  initacls, initracls, owner,
-							  "", fout->remoteVersion, sql))
+							  "", fout->remoteVersion, query))
 			fatal("could not parse initial GRANT ACL list (%s) or initial REVOKE ACL list (%s) for object \"%s\" (%s)",
 				  initacls, initracls, name, type);
-		appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
+		appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
 	}
 
 	if (!buildACLCommands(name, subname, nspname, type,
 						  acls, racls, owner,
-						  "", fout->remoteVersion, sql))
+						  "", fout->remoteVersion, query))
 		fatal("could not parse GRANT ACL list (%s) or REVOKE ACL list (%s) for object \"%s\" (%s)",
 			  acls, racls, name, type);
 
-	if (sql->len > 0)
+	if (haveACL && tag != NULL)
 	{
-		PQExpBuffer tag = createPQExpBuffer();
-		DumpId		aclDeps[2];
-		int			nDeps = 0;
-
 		if (subname)
 			appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
 		else
 			appendPQExpBuffer(tag, "%s %s", type, name);
-
-		aclDeps[nDeps++] = objDumpId;
-		if (altDumpId != InvalidDumpId)
-			aclDeps[nDeps++] = altDumpId;
-
-		aclDumpId = createDumpId();
-
-		ArchiveEntry(fout, nilCatalogId, aclDumpId,
-					 ARCHIVE_OPTS(.tag = tag->data,
-								  .namespace = nspname,
-								  .owner = owner,
-								  .description = "ACL",
-								  .section = SECTION_NONE,
-								  .createStmt = sql->data,
-								  .deps = aclDeps,
-								  .nDeps = nDeps));
-
-		destroyPQExpBuffer(tag);
 	}
 
-	destroyPQExpBuffer(sql);
-
-	return aclDumpId;
+	return haveACL;
 }
 
 /*
@@ -15166,34 +15253,58 @@ dumpSecLabel(Archive *fout, const char *type, const char *name,
 			 const char *namespace, const char *owner,
 			 CatalogId catalogId, int subid, DumpId dumpId)
 {
+	PQExpBuffer query = createPQExpBuffer();
+	PQExpBuffer tag = createPQExpBuffer();
+
+	if (dumpSecLabelQuery(fout, query, tag, type, name,
+						  namespace, owner, catalogId, subid, dumpId))
+	{
+		ArchiveEntry(fout, nilCatalogId, createDumpId(),
+					 ARCHIVE_OPTS(.tag = tag->data,
+								  .namespace = namespace,
+								  .owner = owner,
+								  .description = "SECURITY LABEL",
+								  .section = SECTION_NONE,
+								  .createStmt = query->data,
+								  .deps = &dumpId,
+								  .nDeps = 1));
+	}
+
+	destroyPQExpBuffer(query);
+	destroyPQExpBuffer(tag);
+}
+
+static bool
+dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+				  const char *type, const char *name,
+				  const char *namespace, const char *owner,
+				  CatalogId catalogId, int subid, DumpId dumpId)
+{
 	DumpOptions *dopt = fout->dopt;
 	SecLabelItem *labels;
 	int			nlabels;
 	int			i;
-	PQExpBuffer query;
 
 	/* do nothing, if --no-security-labels is supplied */
 	if (dopt->no_security_labels)
-		return;
+		return false;
 
 	/* Security labels are schema not data ... except blob labels are data */
 	if (strcmp(type, "LARGE OBJECT") != 0)
 	{
 		if (dopt->dataOnly)
-			return;
+			return false;
 	}
 	else
 	{
 		/* We do dump blob security labels in binary-upgrade mode */
 		if (dopt->schemaOnly && !dopt->binary_upgrade)
-			return;
+			return false;
 	}
 
 	/* Search for security labels associated with catalogId, using table */
 	nlabels = findSecLabels(fout, catalogId.tableoid, catalogId.oid, &labels);
 
-	query = createPQExpBuffer();
-
 	for (i = 0; i < nlabels; i++)
 	{
 		/*
@@ -15214,22 +15325,11 @@ dumpSecLabel(Archive *fout, const char *type, const char *name,
 
 	if (query->len > 0)
 	{
-		PQExpBuffer tag = createPQExpBuffer();
-
 		appendPQExpBuffer(tag, "%s %s", type, name);
-		ArchiveEntry(fout, nilCatalogId, createDumpId(),
-					 ARCHIVE_OPTS(.tag = tag->data,
-								  .namespace = namespace,
-								  .owner = owner,
-								  .description = "SECURITY LABEL",
-								  .section = SECTION_NONE,
-								  .createStmt = query->data,
-								  .deps = &dumpId,
-								  .nDeps = 1));
-		destroyPQExpBuffer(tag);
+		return true;
 	}
 
-	destroyPQExpBuffer(query);
+	return false;
 }
 
 /*
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 589b4ae..b16db03 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -59,6 +59,7 @@ main(int argc, char **argv)
 	int			c;
 	int			exit_code;
 	int			numWorkers = 1;
+	int			blobBatchSize = 0;
 	Archive    *AH;
 	char	   *inputFileSpec;
 	static int	disable_triggers = 0;
@@ -120,6 +121,7 @@ main(int argc, char **argv)
 		{"no-publications", no_argument, &no_publications, 1},
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-subscriptions", no_argument, &no_subscriptions, 1},
+		{"restore-blob-batch-size", required_argument, NULL, 4},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -280,6 +282,10 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &(opts->dumpSections));
 				break;
 
+			case 4:				/* # of blobs to restore per transaction */
+				blobBatchSize = atoi(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -434,6 +440,7 @@ main(int argc, char **argv)
 		SortTocFromFile(AH);
 
 	AH->numWorkers = numWorkers;
+	AH->blobBatchSize = blobBatchSize;
 
 	if (opts->tocSummary)
 		PrintTOCSummary(AH);
@@ -506,6 +513,8 @@ usage(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --restore-blob-batch-size=NUM\n"
+			 "                               attempt to restore NUM large objects per transaction\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 9c9b313..868e9f6 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,8 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"index-collation-versions-unknown", no_argument, NULL, 2},
+		{"restore-jobs", required_argument, NULL, 3},
+		{"restore-blob-batch-size", required_argument, NULL, 4},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -208,6 +210,14 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.ind_coll_unknown = true;
 				break;
 
+			case 3:
+				user_opts.restore_jobs = atoi(optarg);
+				break;
+
+			case 4:
+				user_opts.blob_batch_size = atoi(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -314,6 +324,8 @@ usage(void)
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
 	printf(_("  --index-collation-versions-unknown\n"));
 	printf(_("                                mark text indexes as needing to be rebuilt\n"));
+	printf(_("  --restore-blob-batch-size=NUM attempt to restore NUM large objects per\n"));
+	printf(_("                                transaction\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index e23b8ca..095e980 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -385,10 +385,13 @@ create_new_objects(void)
 		parallel_exec_prog(log_file_name,
 						   NULL,
 						   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+						   "--jobs %d --restore-blob-batch-size %d "
 						   "--dbname template1 \"%s\"",
 						   new_cluster.bindir,
 						   cluster_conn_opts(&new_cluster),
 						   create_opts,
+						   user_opts.restore_jobs,
+						   user_opts.blob_batch_size,
 						   sql_file_name);
 	}
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 919a784..5647f96 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -291,6 +291,8 @@ typedef struct
 								 * changes */
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
+	int			restore_jobs;	/* number of pg_restore --jobs to use */
+	int			blob_batch_size; /* number of blobs to restore per xact */
 	char	   *socketdir;		/* directory to use for Unix sockets */
 	bool		ind_coll_unknown;	/* mark unknown index collation versions */
 } UserOpts;

Reply via email to