While testing parallel dump/restore over the past few days, I noticed that
it seemed to do an awful lot of duplicative SET commands, which I traced
to the fact that restore was doing _doSetFixedOutputState(AH) in the wrong
place, ie, once per TOC entry not once per worker.  Another thing that's
useless overhead is that lockTableForWorker() is doing an actual SQL query
to fetch the name of a table that we already have at hand.  Poking around
in this area also convinced me that it was pretty weird for CloneArchive
to be managing encoding, and only encoding, when cloning a dump
connection; that should be handled by setup_connection.  I also noticed
several unchecked strdup() calls that of course should be pg_strdup().

I put together the attached patch that cleans all this up.  It's hard to
show any noticeable performance difference, but the query log certainly
looks cleaner.  Any objections?

                        regards, tom lane

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index e9e8698..6a808dc 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
*************** IsEveryWorkerIdle(ParallelState *pstate)
*** 802,808 ****
  static void
  lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
  {
- 	Archive    *AHX = (Archive *) AH;
  	const char *qualId;
  	PQExpBuffer query;
  	PGresult   *res;
--- 802,807 ----
*************** lockTableForWorker(ArchiveHandle *AH, To
*** 813,845 ****
  
  	query = createPQExpBuffer();
  
! 	/*
! 	 * XXX this is an unbelievably expensive substitute for knowing how to dig
! 	 * a table name out of a TocEntry.
! 	 */
! 	appendPQExpBuffer(query,
! 					  "SELECT pg_namespace.nspname,"
! 					  "       pg_class.relname "
! 					  "  FROM pg_class "
! 					"  JOIN pg_namespace on pg_namespace.oid = relnamespace "
! 					  " WHERE pg_class.oid = %u", te->catalogId.oid);
! 
! 	res = PQexec(AH->connection, query->data);
! 
! 	if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
! 		exit_horribly(modulename,
! 					  "could not get relation name for OID %u: %s\n",
! 					  te->catalogId.oid, PQerrorMessage(AH->connection));
! 
! 	resetPQExpBuffer(query);
! 
! 	qualId = fmtQualifiedId(AHX->remoteVersion,
! 							PQgetvalue(res, 0, 0),
! 							PQgetvalue(res, 0, 1));
  
  	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
  					  qualId);
- 	PQclear(res);
  
  	res = PQexec(AH->connection, query->data);
  
--- 812,821 ----
  
  	query = createPQExpBuffer();
  
! 	qualId = fmtQualifiedId(AH->public.remoteVersion, te->namespace, te->tag);
  
  	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
  					  qualId);
  
  	res = PQexec(AH->connection, query->data);
  
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index ad8e132..32e1c82 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** restore_toc_entries_postfork(ArchiveHand
*** 3893,3898 ****
--- 3893,3899 ----
  					ropt->pghost, ropt->pgport, ropt->username,
  					ropt->promptPassword);
  
+ 	/* re-establish fixed state */
  	_doSetFixedOutputState(AH);
  
  	/*
*************** parallel_restore(ParallelArgs *args)
*** 4071,4080 ****
  	TocEntry   *te = args->te;
  	int			status;
  
- 	_doSetFixedOutputState(AH);
- 
  	Assert(AH->connection != NULL);
  
  	AH->public.n_errors = 0;
  
  	/* Restore the TOC item */
--- 4072,4080 ----
  	TocEntry   *te = args->te;
  	int			status;
  
  	Assert(AH->connection != NULL);
  
+ 	/* Count only errors associated with this TOC entry */
  	AH->public.n_errors = 0;
  
  	/* Restore the TOC item */
*************** CloneArchive(ArchiveHandle *AH)
*** 4443,4452 ****
--- 4443,4456 ----
  		RestoreOptions *ropt = AH->public.ropt;
  
  		Assert(AH->connection == NULL);
+ 
  		/* this also sets clone->connection */
  		ConnectDatabase((Archive *) clone, ropt->dbname,
  						ropt->pghost, ropt->pgport, ropt->username,
  						ropt->promptPassword);
+ 
+ 		/* re-establish fixed state */
+ 		_doSetFixedOutputState(clone);
  	}
  	else
  	{
*************** CloneArchive(ArchiveHandle *AH)
*** 4454,4460 ****
  		char	   *pghost;
  		char	   *pgport;
  		char	   *username;
- 		const char *encname;
  
  		Assert(AH->connection != NULL);
  
--- 4458,4463 ----
*************** CloneArchive(ArchiveHandle *AH)
*** 4468,4485 ****
  		pghost = PQhost(AH->connection);
  		pgport = PQport(AH->connection);
  		username = PQuser(AH->connection);
- 		encname = pg_encoding_to_char(AH->public.encoding);
  
  		/* this also sets clone->connection */
  		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
  
! 		/*
! 		 * Set the same encoding, whatever we set here is what we got from
! 		 * pg_encoding_to_char(), so we really shouldn't run into an error
! 		 * setting that very same value. Also see the comment in
! 		 * SetupConnection().
! 		 */
! 		PQsetClientEncoding(clone->connection, encname);
  	}
  
  	/* Let the format-specific code have a chance too */
--- 4471,4481 ----
  		pghost = PQhost(AH->connection);
  		pgport = PQport(AH->connection);
  		username = PQuser(AH->connection);
  
  		/* this also sets clone->connection */
  		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
  
! 		/* setupDumpWorker will fix up connection state */
  	}
  
  	/* Let the format-specific code have a chance too */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f85778d..3608d88 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
*************** setup_connection(Archive *AH, const char
*** 952,961 ****
  	const char *std_strings;
  
  	/*
! 	 * Set the client encoding if requested. If dumpencoding == NULL then
! 	 * either it hasn't been requested or we're a cloned connection and then
! 	 * this has already been set in CloneArchive according to the original
! 	 * connection encoding.
  	 */
  	if (dumpencoding)
  	{
--- 952,958 ----
  	const char *std_strings;
  
  	/*
! 	 * Set the client encoding if requested.
  	 */
  	if (dumpencoding)
  	{
*************** setup_connection(Archive *AH, const char
*** 973,979 ****
  	std_strings = PQparameterStatus(conn, "standard_conforming_strings");
  	AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
  
! 	/* Set the role if requested */
  	if (!use_role && AH->use_role)
  		use_role = AH->use_role;
  
--- 970,980 ----
  	std_strings = PQparameterStatus(conn, "standard_conforming_strings");
  	AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
  
! 	/*
! 	 * Set the role if requested.  In a parallel dump worker, we'll be passed
! 	 * use_role == NULL, but AH->use_role is already set (if user specified it
! 	 * originally) and we should use that.
! 	 */
  	if (!use_role && AH->use_role)
  		use_role = AH->use_role;
  
*************** setup_connection(Archive *AH, const char
*** 986,994 ****
  		ExecuteSqlStatement(AH, query->data);
  		destroyPQExpBuffer(query);
  
! 		/* save this for later use on parallel connections */
  		if (!AH->use_role)
! 			AH->use_role = strdup(use_role);
  	}
  
  	/* Set the datestyle to ISO to ensure the dump's portability */
--- 987,995 ----
  		ExecuteSqlStatement(AH, query->data);
  		destroyPQExpBuffer(query);
  
! 		/* save it for possible later use by parallel workers */
  		if (!AH->use_role)
! 			AH->use_role = pg_strdup(use_role);
  	}
  
  	/* Set the datestyle to ISO to ensure the dump's portability */
*************** setup_connection(Archive *AH, const char
*** 1074,1084 ****
  							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
  
  	/*
! 	 * define an export snapshot, either chosen by user or needed for parallel
! 	 * dump.
  	 */
  	if (dumpsnapshot)
! 		AH->sync_snapshot_id = strdup(dumpsnapshot);
  
  	if (AH->sync_snapshot_id)
  	{
--- 1075,1086 ----
  							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
  
  	/*
! 	 * If user specified a snapshot to use, select that.  In a parallel dump
! 	 * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
! 	 * is already set (if the server can handle it) and we should use that.
  	 */
  	if (dumpsnapshot)
! 		AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
  
  	if (AH->sync_snapshot_id)
  	{
*************** setup_connection(Archive *AH, const char
*** 1104,1113 ****
  	}
  }
  
  static void
! setupDumpWorker(Archive *AHX)
  {
! 	setup_connection(AHX, NULL, NULL, NULL);
  }
  
  static char *
--- 1106,1125 ----
  	}
  }
  
+ /* Set up connection for a parallel worker process */
  static void
! setupDumpWorker(Archive *AH)
  {
! 	/*
! 	 * We want to re-select all the same values the master connection is
! 	 * using.  We'll have inherited directly-usable values in
! 	 * AH->sync_snapshot_id and AH->use_role, but we need to translate the
! 	 * inherited encoding value back to a string to pass to setup_connection.
! 	 */
! 	setup_connection(AH,
! 					 pg_encoding_to_char(AH->encoding),
! 					 NULL,
! 					 NULL);
  }
  
  static char *
*************** get_synchronized_snapshot(Archive *fout)
*** 1118,1124 ****
  	PGresult   *res;
  
  	res = ExecuteSqlQueryForSingleRow(fout, query);
! 	result = strdup(PQgetvalue(res, 0, 0));
  	PQclear(res);
  
  	return result;
--- 1130,1136 ----
  	PGresult   *res;
  
  	res = ExecuteSqlQueryForSingleRow(fout, query);
! 	result = pg_strdup(PQgetvalue(res, 0, 0));
  	PQclear(res);
  
  	return result;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to