I wrote:
> In the attached patch for this, I took a middle ground of separating out
> the command and status string building and parsing functions.  There isn't
> presently any provision for letting archive format modules override these,
> but that could easily be added if we ever need it.  Meanwhile, this saves
> about 100 lines of rather messy code.

Attached are two versions of this patch.  The first is against current
HEAD (i.e., rebased over e652273e073566b6) and the second is additionally
over the patch I proposed at <6814.1464893...@sss.pgh.pa.us>, resolving
the merge conflicts between them.

Since this is only refactoring, and doesn't (I think) fix any bugs,
I'm setting it aside till the next commitfest.

                        regards, tom lane

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 7647271..9080102 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 20,49 ****
   * the desired number of worker processes, which each enter WaitForCommands().
   *
   * The master process dispatches an individual work item to one of the worker
!  * processes in DispatchJobForTocEntry().  That calls
!  * AH->MasterStartParallelItemPtr, a routine of the output format.  This
!  * function's arguments are the parents archive handle AH (containing the full
!  * catalog information), the TocEntry that the worker should work on and a
!  * T_Action value indicating whether this is a backup or a restore task.  The
!  * function simply converts the TocEntry assignment into a command string that
!  * is then sent over to the worker process. In the simplest case that would be
!  * something like "DUMP 1234", with 1234 being the TocEntry id.
!  *
   * The worker process receives and decodes the command and passes it to the
   * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
   * which are routines of the current archive format.  That routine performs
!  * the required action (dump or restore) and returns a malloc'd status string.
!  * The status string is passed back to the master where it is interpreted by
!  * AH->MasterEndParallelItemPtr, another format-specific routine.  That
!  * function can update state or catalog information on the master's side,
!  * depending on the reply from the worker process.  In the end it returns a
!  * status code, which is 0 for successful execution.
   *
!  * Remember that we have forked off the workers only after we have read in
!  * the catalog.  That's why our worker processes can also access the catalog
!  * information.  (In the Windows case, the workers are threads in the same
!  * process.  To avoid problems, they work with cloned copies of the Archive
!  * data structure; see RunWorker().)
   *
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
--- 20,41 ----
   * the desired number of worker processes, which each enter WaitForCommands().
   *
   * The master process dispatches an individual work item to one of the worker
!  * processes in DispatchJobForTocEntry().  We send a command string such as
!  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
   * The worker process receives and decodes the command and passes it to the
   * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
   * which are routines of the current archive format.  That routine performs
!  * the required action (dump or restore) and returns an integer status code.
!  * This is passed back to the master which updates its state accordingly.
   *
!  * In principle additional archive-format-specific information might be needed
!  * in commands or worker status responses, but so far that hasn't proved
!  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
!  * data structures.  Remember that we have forked off the workers only after
!  * we have read in the catalog.  That's why our worker processes can also
!  * access the catalog information.  (In the Windows case, the workers are
!  * threads in the same process.  To avoid problems, they work with cloned
!  * copies of the Archive data structure; see RunWorker().)
   *
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
*************** ParallelBackupEnd(ArchiveHandle *AH, Par
*** 1057,1062 ****
--- 1049,1158 ----
  }
  
  /*
+  * These next four functions handle construction and parsing of the command
+  * strings and response strings for parallel workers.
+  *
+  * Currently, these can be the same regardless of which archive format we are
+  * processing.  In future, we might want to let format modules override these
+  * functions to add format-specific data to a command or response.
+  */
+ 
+ /*
+  * buildWorkerCommand: format a command string to send to a worker.
+  *
+  * The string is built in the caller-supplied buffer of size buflen.
+  */
+ static void
+ buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
+ 				   char *buf, int buflen)
+ {
+ 	if (act == ACT_DUMP)
+ 		snprintf(buf, buflen, "DUMP %d", te->dumpId);
+ 	else if (act == ACT_RESTORE)
+ 		snprintf(buf, buflen, "RESTORE %d", te->dumpId);
+ 	else
+ 		Assert(false);
+ }
+ 
+ /*
+  * parseWorkerCommand: interpret a command string in a worker.
+  */
+ static void
+ parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
+ 				   const char *msg)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes;
+ 
+ 	if (messageStartsWith(msg, "DUMP "))
+ 	{
+ 		*act = ACT_DUMP;
+ 		sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
+ 		Assert(nBytes == strlen(msg));
+ 		*te = getTocEntryByDumpId(AH, dumpId);
+ 		Assert(*te != NULL);
+ 	}
+ 	else if (messageStartsWith(msg, "RESTORE "))
+ 	{
+ 		*act = ACT_RESTORE;
+ 		sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
+ 		Assert(nBytes == strlen(msg));
+ 		*te = getTocEntryByDumpId(AH, dumpId);
+ 		Assert(*te != NULL);
+ 	}
+ 	else
+ 		exit_horribly(modulename,
+ 					  "unrecognized command received from master: \"%s\"\n",
+ 					  msg);
+ }
+ 
+ /*
+  * buildWorkerResponse: format a response string to send to the master.
+  *
+  * The string is built in the caller-supplied buffer of size buflen.
+  */
+ static void
+ buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
+ 					char *buf, int buflen)
+ {
+ 	snprintf(buf, buflen, "OK %d %d %d",
+ 			 te->dumpId,
+ 			 status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ }
+ 
+ /*
+  * parseWorkerResponse: parse the status message returned by a worker.
+  *
+  * Returns the integer status code, and may update fields of AH and/or te.
+  */
+ static int
+ parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
+ 					const char *msg)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes,
+ 				n_errors;
+ 	int			status = 0;
+ 
+ 	if (messageStartsWith(msg, "OK "))
+ 	{
+ 		sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(msg));
+ 
+ 		AH->public.n_errors += n_errors;
+ 	}
+ 	else
+ 		exit_horribly(modulename,
+ 					  "invalid message received from worker: \"%s\"\n",
+ 					  msg);
+ 
+ 	return status;
+ }
+ 
+ /*
   * Dispatch a job to some free worker (caller must ensure there is one!)
   *
   * te is the TocEntry to be processed, act is the action to be taken on it.
*************** DispatchJobForTocEntry(ArchiveHandle *AH
*** 1066,1083 ****
  					   T_Action act)
  {
  	int			worker;
! 	char	   *arg;
  
  	/* our caller makes sure that at least one worker is idle */
  	worker = GetIdleWorker(pstate);
  	Assert(worker != NO_SLOT);
  
  	/* Construct and send command string */
! 	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
! 
! 	sendMessageToWorker(pstate, worker, arg);
  
! 	/* XXX aren't we leaking string here? (no, because it's static. Ick.) */
  
  	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
--- 1162,1177 ----
  					   T_Action act)
  {
  	int			worker;
! 	char		buf[256];
  
  	/* our caller makes sure that at least one worker is idle */
  	worker = GetIdleWorker(pstate);
  	Assert(worker != NO_SLOT);
  
  	/* Construct and send command string */
! 	buildWorkerCommand(AH, te, act, buf, sizeof(buf));
  
! 	sendMessageToWorker(pstate, worker, buf);
  
  	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
*************** static void
*** 1193,1202 ****
  WaitForCommands(ArchiveHandle *AH, int pipefd[2])
  {
  	char	   *command;
- 	DumpId		dumpId;
- 	int			nBytes;
- 	char	   *str;
  	TocEntry   *te;
  
  	for (;;)
  	{
--- 1287,1296 ----
  WaitForCommands(ArchiveHandle *AH, int pipefd[2])
  {
  	char	   *command;
  	TocEntry   *te;
+ 	T_Action	act;
+ 	int			status = 0;
+ 	char		buf[256];
  
  	for (;;)
  	{
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 1206,1252 ****
  			return;
  		}
  
! 		if (messageStartsWith(command, "DUMP "))
! 		{
! 			/* Decode the command */
! 			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
! 			Assert(nBytes == strlen(command) - strlen("DUMP "));
! 			te = getTocEntryByDumpId(AH, dumpId);
! 			Assert(te != NULL);
  
  			/* Acquire lock on this table within the worker's session */
  			lockTableForWorker(AH, te);
  
  			/* Perform the dump command */
! 			str = (AH->WorkerJobDumpPtr) (AH, te);
! 
! 			/* Return status to master */
! 			sendMessageToMaster(pipefd, str);
! 
! 			/* we are responsible for freeing the status string */
! 			free(str);
  		}
! 		else if (messageStartsWith(command, "RESTORE "))
  		{
- 			/* Decode the command */
- 			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
- 			Assert(nBytes == strlen(command) - strlen("RESTORE "));
- 			te = getTocEntryByDumpId(AH, dumpId);
- 			Assert(te != NULL);
- 
  			/* Perform the restore command */
! 			str = (AH->WorkerJobRestorePtr) (AH, te);
! 
! 			/* Return status to master */
! 			sendMessageToMaster(pipefd, str);
! 
! 			/* we are responsible for freeing the status string */
! 			free(str);
  		}
  		else
! 			exit_horribly(modulename,
! 					   "unrecognized command received from master: \"%s\"\n",
! 						  command);
  
  		/* command was pg_malloc'd and we are responsible for free()ing it. */
  		free(command);
--- 1300,1328 ----
  			return;
  		}
  
! 		/* Decode the command */
! 		parseWorkerCommand(AH, &te, &act, command);
  
+ 		if (act == ACT_DUMP)
+ 		{
  			/* Acquire lock on this table within the worker's session */
  			lockTableForWorker(AH, te);
  
  			/* Perform the dump command */
! 			status = (AH->WorkerJobDumpPtr) (AH, te);
  		}
! 		else if (act == ACT_RESTORE)
  		{
  			/* Perform the restore command */
! 			status = (AH->WorkerJobRestorePtr) (AH, te);
  		}
  		else
! 			Assert(false);
! 
! 		/* Return status to master */
! 		buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
! 
! 		sendMessageToMaster(pipefd, buf);
  
  		/* command was pg_malloc'd and we are responsible for free()ing it. */
  		free(command);
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 1259,1268 ****
   * If do_wait is true, wait to get a status message; otherwise, just return
   * immediately if there is none available.
   *
!  * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then save the resulting status code and switch the worker's state to
!  * WRKR_FINISHED.  Later, caller must call ReapWorkerStatus() to verify
!  * that the status was "OK" and push the worker back to IDLE state.
   *
   * XXX Rube Goldberg would be proud of this API, but no one else should be.
   *
--- 1335,1344 ----
   * If do_wait is true, wait to get a status message; otherwise, just return
   * immediately if there is none available.
   *
!  * When we get a status message, we save the resulting status code and
!  * switch the worker's state to WRKR_FINISHED.  Later, caller must call
!  * ReapWorkerStatus() to verify that the status was "OK" and push the worker
!  * back to IDLE state.
   *
   * XXX Rube Goldberg would be proud of this API, but no one else should be.
   *
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 1291,1316 ****
  	if (messageStartsWith(msg, "OK "))
  	{
  		TocEntry   *te = pstate->parallelSlot[worker].args->te;
- 		char	   *statusString;
  
! 		if (messageStartsWith(msg, "OK RESTORE "))
! 		{
! 			statusString = msg + strlen("OK RESTORE ");
! 			pstate->parallelSlot[worker].status =
! 				(AH->MasterEndParallelItemPtr)
! 				(AH, te, statusString, ACT_RESTORE);
! 		}
! 		else if (messageStartsWith(msg, "OK DUMP "))
! 		{
! 			statusString = msg + strlen("OK DUMP ");
! 			pstate->parallelSlot[worker].status =
! 				(AH->MasterEndParallelItemPtr)
! 				(AH, te, statusString, ACT_DUMP);
! 		}
! 		else
! 			exit_horribly(modulename,
! 						  "invalid message received from worker: \"%s\"\n",
! 						  msg);
  		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
  	}
  	else
--- 1367,1374 ----
  	if (messageStartsWith(msg, "OK "))
  	{
  		TocEntry   *te = pstate->parallelSlot[worker].args->te;
  
! 		pstate->parallelSlot[worker].status = parseWorkerResponse(AH, te, msg);
  		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
  	}
  	else
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 0376f2b..d969b38 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef void (*PrintTocDataPtr) (Archive
*** 162,173 ****
  typedef void (*ClonePtr) (ArchiveHandle *AH);
  typedef void (*DeClonePtr) (ArchiveHandle *AH);
  
! typedef char *(*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te);
! typedef char *(*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te);
! typedef char *(*MasterStartParallelItemPtr) (ArchiveHandle *AH, TocEntry *te,
! 														 T_Action act);
! typedef int (*MasterEndParallelItemPtr) (ArchiveHandle *AH, TocEntry *te,
! 											  const char *str, T_Action act);
  
  typedef size_t (*CustomOutPtr) (ArchiveHandle *AH, const void *buf, size_t len);
  
--- 162,169 ----
  typedef void (*ClonePtr) (ArchiveHandle *AH);
  typedef void (*DeClonePtr) (ArchiveHandle *AH);
  
! typedef int (*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te);
! typedef int (*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te);
  
  typedef size_t (*CustomOutPtr) (ArchiveHandle *AH, const void *buf, size_t len);
  
*************** struct _archiveHandle
*** 267,275 ****
  	StartBlobPtr StartBlobPtr;
  	EndBlobPtr EndBlobPtr;
  
- 	MasterStartParallelItemPtr MasterStartParallelItemPtr;
- 	MasterEndParallelItemPtr MasterEndParallelItemPtr;
- 
  	SetupWorkerPtr SetupWorkerPtr;
  	WorkerJobDumpPtr WorkerJobDumpPtr;
  	WorkerJobRestorePtr WorkerJobRestorePtr;
--- 263,268 ----
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 66329dc..1f29067 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** static void _LoadBlobs(ArchiveHandle *AH
*** 61,69 ****
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
! static int	_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
! char	   *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
  
  typedef struct
  {
--- 61,67 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static int	_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
  
  typedef struct
  {
*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 133,141 ****
  	AH->ClonePtr = _Clone;
  	AH->DeClonePtr = _DeClone;
  
- 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
- 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
- 
  	/* no parallel dump in the custom archive, only parallel restore */
  	AH->WorkerJobDumpPtr = NULL;
  	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
--- 131,136 ----
*************** _DeClone(ArchiveHandle *AH)
*** 808,884 ****
  }
  
  /*
!  * This function is executed in the child of a parallel backup for the
!  * custom format archive and dumps the actual data.
   */
! char *
  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
  {
- 	/*
- 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
- 	 * instead of static because we work with threads on windows
- 	 */
- 	const int	buflen = 64;
- 	char	   *buf = (char *) pg_malloc(buflen);
  	ParallelArgs pargs;
- 	int			status;
  
  	pargs.AH = AH;
  	pargs.te = te;
  
! 	status = parallel_restore(&pargs);
! 
! 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
! 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the parent process. Depending on the desired
!  * action (dump or restore) it creates a string that is understood by the
!  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
!  */
! static char *
! _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
! {
! 	/*
! 	 * A static char is okay here, even on Windows because we call this
! 	 * function only from one process (the master).
! 	 */
! 	static char buf[64];		/* short fixed-size string + number */
! 
! 	/* no parallel dump in the custom archive format */
! 	Assert(act == ACT_RESTORE);
! 
! 	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the parent process. It analyzes the response of
!  * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
!  */
! static int
! _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
! {
! 	DumpId		dumpId;
! 	int			nBytes,
! 				status,
! 				n_errors;
! 
! 	/* no parallel dump in the custom archive */
! 	Assert(act == ACT_RESTORE);
! 
! 	sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
! 
! 	Assert(nBytes == strlen(str));
! 	Assert(dumpId == te->dumpId);
! 
! 	AH->public.n_errors += n_errors;
! 
! 	return status;
  }
  
  /*--------------------------------------------------
--- 803,820 ----
  }
  
  /*
!  * This function is executed in the child of a parallel restore from a
!  * custom-format archive and restores the actual data for one TOC entry.
   */
! static int
  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
  {
  	ParallelArgs pargs;
  
  	pargs.AH = AH;
  	pargs.te = te;
  
! 	return parallel_restore(&pargs);
  }
  
  /*--------------------------------------------------
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index e52f122..a06a4eb 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** static void _LoadBlobs(ArchiveHandle *AH
*** 89,99 ****
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
! static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
! 					   const char *str, T_Action act);
! static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
! static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
  
  static void setFilePath(ArchiveHandle *AH, char *buf,
  			const char *relativeFilename);
--- 89,96 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static int	_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
! static int	_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
  
  static void setFilePath(ArchiveHandle *AH, char *buf,
  			const char *relativeFilename);
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 140,148 ****
  	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
  	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
  
- 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
- 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
- 
  	/* Set up our private context */
  	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
  	AH->formatData = (void *) ctx;
--- 137,142 ----
*************** _DeClone(ArchiveHandle *AH)
*** 754,874 ****
  }
  
  /*
!  * This function is executed in the parent process. Depending on the desired
!  * action (dump or restore) it creates a string that is understood by the
!  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
!  */
! static char *
! _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
! {
! 	/*
! 	 * A static char is okay here, even on Windows because we call this
! 	 * function only from one process (the master).
! 	 */
! 	static char buf[64];
! 
! 	if (act == ACT_DUMP)
! 		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
! 	else if (act == ACT_RESTORE)
! 		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the child of a parallel backup for the
!  * directory archive and dumps the actual data.
!  *
!  * We are currently returning only the DumpId so theoretically we could
!  * make this function returning an int (or a DumpId). However, to
!  * facilitate further enhancements and because sooner or later we need to
!  * convert this to a string and send it via a message anyway, we stick with
!  * char *. It is parsed on the other side by the _EndMasterParallel()
!  * function of the respective dump format.
   */
! static char *
  _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
  {
  	/*
- 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
- 	 * instead of static because we work with threads on windows
- 	 */
- 	const int	buflen = 64;
- 	char	   *buf = (char *) pg_malloc(buflen);
- 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
- 
- 	/* This should never happen */
- 	if (!tctx)
- 		exit_horribly(modulename, "error during backup\n");
- 
- 	/*
  	 * This function returns void. We either fail and die horribly or
  	 * succeed... A failure will be detected by the parent when the child dies
  	 * unexpectedly.
  	 */
  	WriteDataChunksForTocEntry(AH, te);
  
! 	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
! 
! 	return buf;
  }
  
  /*
!  * This function is executed in the child of a parallel backup for the
!  * directory archive and dumps the actual data.
   */
! static char *
  _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
  {
- 	/*
- 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
- 	 * instead of static because we work with threads on windows
- 	 */
- 	const int	buflen = 64;
- 	char	   *buf = (char *) pg_malloc(buflen);
  	ParallelArgs pargs;
- 	int			status;
  
  	pargs.AH = AH;
  	pargs.te = te;
  
! 	status = parallel_restore(&pargs);
! 
! 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
! 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the parent process. It analyzes the response of
!  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
!  * respective dump format.
!  */
! static int
! _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
! {
! 	DumpId		dumpId;
! 	int			nBytes,
! 				n_errors;
! 	int			status = 0;
! 
! 	if (act == ACT_DUMP)
! 	{
! 		sscanf(str, "%d%n", &dumpId, &nBytes);
! 
! 		Assert(dumpId == te->dumpId);
! 		Assert(nBytes == strlen(str));
! 	}
! 	else if (act == ACT_RESTORE)
! 	{
! 		sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
! 
! 		Assert(dumpId == te->dumpId);
! 		Assert(nBytes == strlen(str));
! 
! 		AH->public.n_errors += n_errors;
! 	}
! 
! 	return status;
  }
--- 748,780 ----
  }
  
  /*
!  * This function is executed in the child of a parallel backup for a
!  * directory-format archive and dumps the actual data for one TOC entry.
   */
! static int
  _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
  {
  	/*
  	 * This function returns void. We either fail and die horribly or
  	 * succeed... A failure will be detected by the parent when the child dies
  	 * unexpectedly.
  	 */
  	WriteDataChunksForTocEntry(AH, te);
  
! 	return 0;
  }
  
  /*
!  * This function is executed in the child of a parallel restore from a
!  * directory-format archive and restores the actual data for one TOC entry.
   */
! static int
  _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
  {
  	ParallelArgs pargs;
  
  	pargs.AH = AH;
  	pargs.te = te;
  
! 	return parallel_restore(&pargs);
  }
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 8dfc6a9..9cadd0c 100644
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
*************** InitArchiveFmt_Tar(ArchiveHandle *AH)
*** 152,160 ****
  	AH->ClonePtr = NULL;
  	AH->DeClonePtr = NULL;
  
- 	AH->MasterStartParallelItemPtr = NULL;
- 	AH->MasterEndParallelItemPtr = NULL;
- 
  	AH->WorkerJobDumpPtr = NULL;
  	AH->WorkerJobRestorePtr = NULL;
  
--- 152,157 ----
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index a9671ca..61af54e 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 20,51 ****
   * the desired number of worker processes, which each enter WaitForCommands().
   *
   * The master process dispatches an individual work item to one of the worker
!  * processes in DispatchJobForTocEntry().  That calls
!  * AH->MasterStartParallelItemPtr, a routine of the output format.  This
!  * function's arguments are the parents archive handle AH (containing the full
!  * catalog information), the TocEntry that the worker should work on and a
!  * T_Action value indicating whether this is a backup or a restore task.  The
!  * function simply converts the TocEntry assignment into a command string that
!  * is then sent over to the worker process. In the simplest case that would be
!  * something like "DUMP 1234", with 1234 being the TocEntry id.
!  *
   * The worker process receives and decodes the command and passes it to the
   * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
   * which are routines of the current archive format.  That routine performs
!  * the required action (dump or restore) and returns a malloc'd status string.
!  * The status string is passed back to the master where it is interpreted by
!  * AH->MasterEndParallelItemPtr, another format-specific routine.  That
!  * function can update format-specific information on the master's side,
!  * depending on the reply from the worker process.  In the end it returns a
!  * status code, which we pass to the ParallelCompletionPtr callback function
!  * that was passed to DispatchJobForTocEntry().  The callback function does
!  * state updating for the master control logic in pg_backup_archiver.c.
   *
!  * Remember that we have forked off the workers only after we have read in
!  * the catalog.  That's why our worker processes can also access the catalog
!  * information.  (In the Windows case, the workers are threads in the same
!  * process.  To avoid problems, they work with cloned copies of the Archive
!  * data structure; see RunWorker().)
   *
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
--- 20,44 ----
   * the desired number of worker processes, which each enter WaitForCommands().
   *
   * The master process dispatches an individual work item to one of the worker
!  * processes in DispatchJobForTocEntry().  We send a command string such as
!  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
   * The worker process receives and decodes the command and passes it to the
   * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
   * which are routines of the current archive format.  That routine performs
!  * the required action (dump or restore) and returns an integer status code.
!  * This is passed back to the master where we pass it to the
!  * ParallelCompletionPtr callback function that was passed to
!  * DispatchJobForTocEntry().  The callback function does state updating
!  * for the master control logic in pg_backup_archiver.c.
   *
!  * In principle additional archive-format-specific information might be needed
!  * in commands or worker status responses, but so far that hasn't proved
!  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
!  * data structures.  Remember that we have forked off the workers only after
!  * we have read in the catalog.  That's why our worker processes can also
!  * access the catalog information.  (In the Windows case, the workers are
!  * threads in the same process.  To avoid problems, they work with cloned
!  * copies of the Archive data structure; see RunWorker().)
   *
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
*************** ParallelBackupEnd(ArchiveHandle *AH, Par
*** 1060,1065 ****
--- 1053,1162 ----
  }
  
  /*
+  * These next four functions handle construction and parsing of the command
+  * strings and response strings for parallel workers.
+  *
+  * Currently, these can be the same regardless of which archive format we are
+  * processing.  In future, we might want to let format modules override these
+  * functions to add format-specific data to a command or response.
+  */
+ 
+ /*
+  * buildWorkerCommand: format a command string to send to a worker.
+  *
+  * The string is built in the caller-supplied buffer of size buflen.
+  */
+ static void
+ buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
+ 				   char *buf, int buflen)
+ {
+ 	if (act == ACT_DUMP)
+ 		snprintf(buf, buflen, "DUMP %d", te->dumpId);
+ 	else if (act == ACT_RESTORE)
+ 		snprintf(buf, buflen, "RESTORE %d", te->dumpId);
+ 	else
+ 		Assert(false);
+ }
+ 
+ /*
+  * parseWorkerCommand: interpret a command string in a worker.
+  */
+ static void
+ parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
+ 				   const char *msg)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes;
+ 
+ 	if (messageStartsWith(msg, "DUMP "))
+ 	{
+ 		*act = ACT_DUMP;
+ 		sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
+ 		Assert(nBytes == strlen(msg));
+ 		*te = getTocEntryByDumpId(AH, dumpId);
+ 		Assert(*te != NULL);
+ 	}
+ 	else if (messageStartsWith(msg, "RESTORE "))
+ 	{
+ 		*act = ACT_RESTORE;
+ 		sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
+ 		Assert(nBytes == strlen(msg));
+ 		*te = getTocEntryByDumpId(AH, dumpId);
+ 		Assert(*te != NULL);
+ 	}
+ 	else
+ 		exit_horribly(modulename,
+ 					  "unrecognized command received from master: \"%s\"\n",
+ 					  msg);
+ }
+ 
+ /*
+  * buildWorkerResponse: format a response string to send to the master.
+  *
+  * The string is built in the caller-supplied buffer of size buflen.
+  */
+ static void
+ buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
+ 					char *buf, int buflen)
+ {
+ 	snprintf(buf, buflen, "OK %d %d %d",
+ 			 te->dumpId,
+ 			 status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ }
+ 
+ /*
+  * parseWorkerResponse: parse the status message returned by a worker.
+  *
+  * Returns the integer status code, and may update fields of AH and/or te.
+  */
+ static int
+ parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
+ 					const char *msg)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes,
+ 				n_errors;
+ 	int			status = 0;
+ 
+ 	if (messageStartsWith(msg, "OK "))
+ 	{
+ 		sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(msg));
+ 
+ 		AH->public.n_errors += n_errors;
+ 	}
+ 	else
+ 		exit_horribly(modulename,
+ 					  "invalid message received from worker: \"%s\"\n",
+ 					  msg);
+ 
+ 	return status;
+ }
+ 
+ /*
   * Dispatch a job to some free worker.
   *
   * te is the TocEntry to be processed, act is the action to be taken on it.
*************** DispatchJobForTocEntry(ArchiveHandle *AH
*** 1077,1094 ****
  					   void *callback_data)
  {
  	int			worker;
! 	char	   *arg;
  
  	/* Get a worker, waiting if none are idle */
  	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
  		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
  
  	/* Construct and send command string */
! 	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
! 
! 	sendMessageToWorker(pstate, worker, arg);
  
! 	/* XXX aren't we leaking string here? (no, because it's static. Ick.) */
  
  	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
--- 1174,1189 ----
  					   void *callback_data)
  {
  	int			worker;
! 	char		buf[256];
  
  	/* Get a worker, waiting if none are idle */
  	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
  		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
  
  	/* Construct and send command string */
! 	buildWorkerCommand(AH, te, act, buf, sizeof(buf));
  
! 	sendMessageToWorker(pstate, worker, buf);
  
  	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
*************** static void
*** 1206,1215 ****
  WaitForCommands(ArchiveHandle *AH, int pipefd[2])
  {
  	char	   *command;
- 	DumpId		dumpId;
- 	int			nBytes;
- 	char	   *str;
  	TocEntry   *te;
  
  	for (;;)
  	{
--- 1301,1310 ----
  WaitForCommands(ArchiveHandle *AH, int pipefd[2])
  {
  	char	   *command;
  	TocEntry   *te;
+ 	T_Action	act;
+ 	int			status = 0;
+ 	char		buf[256];
  
  	for (;;)
  	{
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 1219,1265 ****
  			return;
  		}
  
! 		if (messageStartsWith(command, "DUMP "))
! 		{
! 			/* Decode the command */
! 			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
! 			Assert(nBytes == strlen(command) - strlen("DUMP "));
! 			te = getTocEntryByDumpId(AH, dumpId);
! 			Assert(te != NULL);
  
  			/* Acquire lock on this table within the worker's session */
  			lockTableForWorker(AH, te);
  
  			/* Perform the dump command */
! 			str = (AH->WorkerJobDumpPtr) (AH, te);
! 
! 			/* Return status to master */
! 			sendMessageToMaster(pipefd, str);
! 
! 			/* we are responsible for freeing the status string */
! 			free(str);
  		}
! 		else if (messageStartsWith(command, "RESTORE "))
  		{
- 			/* Decode the command */
- 			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
- 			Assert(nBytes == strlen(command) - strlen("RESTORE "));
- 			te = getTocEntryByDumpId(AH, dumpId);
- 			Assert(te != NULL);
- 
  			/* Perform the restore command */
! 			str = (AH->WorkerJobRestorePtr) (AH, te);
! 
! 			/* Return status to master */
! 			sendMessageToMaster(pipefd, str);
! 
! 			/* we are responsible for freeing the status string */
! 			free(str);
  		}
  		else
! 			exit_horribly(modulename,
! 					   "unrecognized command received from master: \"%s\"\n",
! 						  command);
  
  		/* command was pg_malloc'd and we are responsible for free()ing it. */
  		free(command);
--- 1314,1342 ----
  			return;
  		}
  
! 		/* Decode the command */
! 		parseWorkerCommand(AH, &te, &act, command);
  
+ 		if (act == ACT_DUMP)
+ 		{
  			/* Acquire lock on this table within the worker's session */
  			lockTableForWorker(AH, te);
  
  			/* Perform the dump command */
! 			status = (AH->WorkerJobDumpPtr) (AH, te);
  		}
! 		else if (act == ACT_RESTORE)
  		{
  			/* Perform the restore command */
! 			status = (AH->WorkerJobRestorePtr) (AH, te);
  		}
  		else
! 			Assert(false);
! 
! 		/* Return status to master */
! 		buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
! 
! 		sendMessageToMaster(pipefd, buf);
  
  		/* command was pg_malloc'd and we are responsible for free()ing it. */
  		free(command);
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 1272,1280 ****
   * If do_wait is true, wait to get a status message; otherwise, just return
   * immediately if there is none available.
   *
!  * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then pass the resulting status code to the callback function that was
!  * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
   *
   * Returns true if we collected a status message, else false.
   *
--- 1349,1357 ----
   * If do_wait is true, wait to get a status message; otherwise, just return
   * immediately if there is none available.
   *
!  * When we get a status message, we pass the status code to the callback
!  * function that was specified to DispatchJobForTocEntry, then reset the
!  * worker status to IDLE.
   *
   * Returns true if we collected a status message, else false.
   *
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 1304,1332 ****
  	{
  		ParallelSlot *slot = &pstate->parallelSlot[worker];
  		TocEntry   *te = slot->te;
- 		char	   *statusString;
  		int			status;
  
! 		if (messageStartsWith(msg, "OK RESTORE "))
! 		{
! 			statusString = msg + strlen("OK RESTORE ");
! 			status =
! 				(AH->MasterEndParallelItemPtr)
! 				(AH, te, statusString, ACT_RESTORE);
! 			slot->callback(AH, te, status, slot->callback_data);
! 		}
! 		else if (messageStartsWith(msg, "OK DUMP "))
! 		{
! 			statusString = msg + strlen("OK DUMP ");
! 			status =
! 				(AH->MasterEndParallelItemPtr)
! 				(AH, te, statusString, ACT_DUMP);
! 			slot->callback(AH, te, status, slot->callback_data);
! 		}
! 		else
! 			exit_horribly(modulename,
! 						  "invalid message received from worker: \"%s\"\n",
! 						  msg);
  		slot->workerStatus = WRKR_IDLE;
  		slot->te = NULL;
  	}
--- 1381,1390 ----
  	{
  		ParallelSlot *slot = &pstate->parallelSlot[worker];
  		TocEntry   *te = slot->te;
  		int			status;
  
! 		status = parseWorkerResponse(AH, te, msg);
! 		slot->callback(AH, te, status, slot->callback_data);
  		slot->workerStatus = WRKR_IDLE;
  		slot->te = NULL;
  	}
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 1350,1357 ****
   * WFW_ONE_IDLE: wait for at least one worker to be idle
   * WFW_ALL_IDLE: wait for all workers to be idle
   *
!  * Any received results are passed to MasterEndParallelItemPtr and then
!  * to the callback specified to DispatchJobForTocEntry.
   *
   * This function is executed in the master process.
   */
--- 1408,1415 ----
   * WFW_ONE_IDLE: wait for at least one worker to be idle
   * WFW_ALL_IDLE: wait for all workers to be idle
   *
!  * Any received results are passed to the callback specified to
!  * DispatchJobForTocEntry.
   *
   * This function is executed in the master process.
   */
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 123aa5d..97d34a5 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef void (*PrintTocDataPtr) (Archive
*** 161,172 ****
  typedef void (*ClonePtr) (ArchiveHandle *AH);
  typedef void (*DeClonePtr) (ArchiveHandle *AH);
  
! typedef char *(*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te);
! typedef char *(*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te);
! typedef char *(*MasterStartParallelItemPtr) (ArchiveHandle *AH, TocEntry *te,
! 														 T_Action act);
! typedef int (*MasterEndParallelItemPtr) (ArchiveHandle *AH, TocEntry *te,
! 											  const char *str, T_Action act);
  
  typedef size_t (*CustomOutPtr) (ArchiveHandle *AH, const void *buf, size_t len);
  
--- 161,168 ----
  typedef void (*ClonePtr) (ArchiveHandle *AH);
  typedef void (*DeClonePtr) (ArchiveHandle *AH);
  
! typedef int (*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te);
! typedef int (*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te);
  
  typedef size_t (*CustomOutPtr) (ArchiveHandle *AH, const void *buf, size_t len);
  
*************** struct _archiveHandle
*** 266,274 ****
  	StartBlobPtr StartBlobPtr;
  	EndBlobPtr EndBlobPtr;
  
- 	MasterStartParallelItemPtr MasterStartParallelItemPtr;
- 	MasterEndParallelItemPtr MasterEndParallelItemPtr;
- 
  	SetupWorkerPtr SetupWorkerPtr;
  	WorkerJobDumpPtr WorkerJobDumpPtr;
  	WorkerJobRestorePtr WorkerJobRestorePtr;
--- 262,267 ----
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index c4f487a..5388c08 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** static void _LoadBlobs(ArchiveHandle *AH
*** 61,69 ****
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
! static int	_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
! char	   *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
  
  typedef struct
  {
--- 61,67 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static int	_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
  
  typedef struct
  {
*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 133,141 ****
  	AH->ClonePtr = _Clone;
  	AH->DeClonePtr = _DeClone;
  
- 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
- 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
- 
  	/* no parallel dump in the custom archive, only parallel restore */
  	AH->WorkerJobDumpPtr = NULL;
  	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
--- 131,136 ----
*************** _DeClone(ArchiveHandle *AH)
*** 808,880 ****
  }
  
  /*
!  * This function is executed in the child of a parallel backup for the
!  * custom format archive and dumps the actual data.
!  */
! char *
! _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
! {
! 	/*
! 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
! 	 * instead of static because we work with threads on windows
! 	 */
! 	const int	buflen = 64;
! 	char	   *buf = (char *) pg_malloc(buflen);
! 	int			status;
! 
! 	status = parallel_restore(AH, te);
! 
! 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
! 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the parent process. Depending on the desired
!  * action (dump or restore) it creates a string that is understood by the
!  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
!  */
! static char *
! _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
! {
! 	/*
! 	 * A static char is okay here, even on Windows because we call this
! 	 * function only from one process (the master).
! 	 */
! 	static char buf[64];		/* short fixed-size string + number */
! 
! 	/* no parallel dump in the custom archive format */
! 	Assert(act == ACT_RESTORE);
! 
! 	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the parent process. It analyzes the response of
!  * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
   */
  static int
! _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
  {
! 	DumpId		dumpId;
! 	int			nBytes,
! 				status,
! 				n_errors;
! 
! 	/* no parallel dump in the custom archive */
! 	Assert(act == ACT_RESTORE);
! 
! 	sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
! 
! 	Assert(nBytes == strlen(str));
! 	Assert(dumpId == te->dumpId);
! 
! 	AH->public.n_errors += n_errors;
! 
! 	return status;
  }
  
  /*--------------------------------------------------
--- 803,815 ----
  }
  
  /*
!  * This function is executed in the child of a parallel restore from a
!  * custom-format archive and restores the actual data for one TOC entry.
   */
  static int
! _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
  {
! 	return parallel_restore(AH, te);
  }
  
  /*--------------------------------------------------
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 8071259..ae44371 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** static void _LoadBlobs(ArchiveHandle *AH
*** 89,99 ****
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
! static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
! 					   const char *str, T_Action act);
! static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
! static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
  
  static void setFilePath(ArchiveHandle *AH, char *buf,
  			const char *relativeFilename);
--- 89,96 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static int	_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
! static int	_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
  
  static void setFilePath(ArchiveHandle *AH, char *buf,
  			const char *relativeFilename);
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 140,148 ****
  	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
  	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
  
- 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
- 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
- 
  	/* Set up our private context */
  	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
  	AH->formatData = (void *) ctx;
--- 137,142 ----
*************** _DeClone(ArchiveHandle *AH)
*** 754,870 ****
  }
  
  /*
!  * This function is executed in the parent process. Depending on the desired
!  * action (dump or restore) it creates a string that is understood by the
!  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
!  */
! static char *
! _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
! {
! 	/*
! 	 * A static char is okay here, even on Windows because we call this
! 	 * function only from one process (the master).
! 	 */
! 	static char buf[64];
! 
! 	if (act == ACT_DUMP)
! 		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
! 	else if (act == ACT_RESTORE)
! 		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the child of a parallel backup for the
!  * directory archive and dumps the actual data.
!  *
!  * We are currently returning only the DumpId so theoretically we could
!  * make this function returning an int (or a DumpId). However, to
!  * facilitate further enhancements and because sooner or later we need to
!  * convert this to a string and send it via a message anyway, we stick with
!  * char *. It is parsed on the other side by the _EndMasterParallel()
!  * function of the respective dump format.
   */
! static char *
  _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
  {
  	/*
- 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
- 	 * instead of static because we work with threads on windows
- 	 */
- 	const int	buflen = 64;
- 	char	   *buf = (char *) pg_malloc(buflen);
- 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
- 
- 	/* This should never happen */
- 	if (!tctx)
- 		exit_horribly(modulename, "error during backup\n");
- 
- 	/*
  	 * This function returns void. We either fail and die horribly or
  	 * succeed... A failure will be detected by the parent when the child dies
  	 * unexpectedly.
  	 */
  	WriteDataChunksForTocEntry(AH, te);
  
! 	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
! 
! 	return buf;
! }
! 
! /*
!  * This function is executed in the child of a parallel backup for the
!  * directory archive and dumps the actual data.
!  */
! static char *
! _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
! {
! 	/*
! 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
! 	 * instead of static because we work with threads on windows
! 	 */
! 	const int	buflen = 64;
! 	char	   *buf = (char *) pg_malloc(buflen);
! 	int			status;
! 
! 	status = parallel_restore(AH, te);
! 
! 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
! 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
! 
! 	return buf;
  }
  
  /*
!  * This function is executed in the parent process. It analyzes the response of
!  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
!  * respective dump format.
   */
  static int
! _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
  {
! 	DumpId		dumpId;
! 	int			nBytes,
! 				n_errors;
! 	int			status = 0;
! 
! 	if (act == ACT_DUMP)
! 	{
! 		sscanf(str, "%d%n", &dumpId, &nBytes);
! 
! 		Assert(dumpId == te->dumpId);
! 		Assert(nBytes == strlen(str));
! 	}
! 	else if (act == ACT_RESTORE)
! 	{
! 		sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
! 
! 		Assert(dumpId == te->dumpId);
! 		Assert(nBytes == strlen(str));
! 
! 		AH->public.n_errors += n_errors;
! 	}
! 
! 	return status;
  }
--- 748,775 ----
  }
  
  /*
!  * This function is executed in the child of a parallel backup for a
!  * directory-format archive and dumps the actual data for one TOC entry.
   */
! static int
  _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
  {
  	/*
  	 * This function returns void. We either fail and die horribly or
  	 * succeed... A failure will be detected by the parent when the child dies
  	 * unexpectedly.
  	 */
  	WriteDataChunksForTocEntry(AH, te);
  
! 	return 0;
  }
  
  /*
!  * This function is executed in the child of a parallel restore from a
!  * directory-format archive and restores the actual data for one TOC entry.
   */
  static int
! _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
  {
! 	return parallel_restore(AH, te);
  }
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 8dfc6a9..9cadd0c 100644
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
*************** InitArchiveFmt_Tar(ArchiveHandle *AH)
*** 152,160 ****
  	AH->ClonePtr = NULL;
  	AH->DeClonePtr = NULL;
  
- 	AH->MasterStartParallelItemPtr = NULL;
- 	AH->MasterEndParallelItemPtr = NULL;
- 
  	AH->WorkerJobDumpPtr = NULL;
  	AH->WorkerJobRestorePtr = NULL;
  
--- 152,157 ----
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to