On 10/13/2012 10:46 PM, Andrew Dunstan wrote:

On 09/17/2012 10:01 PM, Joachim Wieland wrote:
On Mon, Jun 18, 2012 at 10:05 PM, Joachim Wieland <j...@mcknight.de> wrote:
Attached is a rebased version of the parallel pg_dump patch.
Attached is another rebased version for the current commitfest.

These did not apply cleanly, but I have fixed them up. The combined diff against git tip is attached. It can also be pulled from my parallel_dump branch on <https://github.com/adunstan/postgresql-dev.git> This builds and runs OK on Linux, which is a start ...


Well, you would also need this piece if you're applying the patch (sometimes I forget to do git add ...)

cheers


andrew

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
new file mode 100644
index 0000000..65bc8bb
--- /dev/null
+++ b/src/bin/pg_dump/parallel.c
@@ -0,0 +1,1283 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.c
+ *
+ *	Parallel support for the pg_dump archiver
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	The author is not responsible for loss or damages that may
+ *	result from its use.
+ *
+ * IDENTIFICATION
+ *		src/bin/pg_dump/parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "pg_backup_db.h"
+
+#include "dumpmem.h"
+#include "dumputils.h"
+#include "parallel.h"
+
+#ifndef WIN32
+#include <sys/types.h>
+#include <sys/wait.h>
+#include "signal.h"
+#include <unistd.h>
+#include <fcntl.h>
+#endif
+
+#define PIPE_READ							0
+#define PIPE_WRITE							1
+
+/* file-scope variables */
+#ifdef WIN32
+static unsigned int	tMasterThreadId = 0;
+static HANDLE		termEvent = INVALID_HANDLE_VALUE;
+static int pgpipe(int handles[2]);
+static int piperead(int s, char *buf, int len);
+#define pipewrite(a,b,c)	send(a,b,c,0)
+#else
+/*
+ * aborting is only ever used in the master, the workers are fine with just
+ * wantAbort.
+ */
+static bool aborting = false;
+static volatile sig_atomic_t wantAbort = 0;
+#define pgpipe(a)			pipe(a)
+#define piperead(a,b,c)		read(a,b,c)
+#define pipewrite(a,b,c)	write(a,b,c)
+#endif
+
+typedef struct ShutdownInformation
+{
+    ParallelState *pstate;
+    Archive       *AHX;
+} ShutdownInformation;
+
+static ShutdownInformation shutdown_info;
+
+static const char *modulename = gettext_noop("parallel archiver");
+
+static ParallelSlot *GetMyPSlot(ParallelState *pstate);
+static void parallel_exit_msg_func(const char *modulename,
+								   const char *fmt, va_list ap)
+			__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
+static void parallel_msg_master(ParallelSlot *slot, const char *modulename,
+								const char *fmt, va_list ap)
+			__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
+static void archive_close_connection(int code, void *arg);
+static void ShutdownWorkersHard(ParallelState *pstate);
+static void WaitForTerminatingWorkers(ParallelState *pstate);
+#ifndef WIN32
+static void sigTermHandler(int signum);
+#endif
+static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+						RestoreOptions *ropt);
+static bool HasEveryWorkerTerminated(ParallelState *pstate);
+
+static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
+static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+static char *getMessageFromMaster(int pipefd[2]);
+static void sendMessageToMaster(int pipefd[2], const char *str);
+static int select_loop(int maxFd, fd_set *workerset);
+static char *getMessageFromWorker(ParallelState *pstate,
+								  bool do_wait, int *worker);
+static void sendMessageToWorker(ParallelState *pstate,
+							    int worker, const char *str);
+static char *readMessageFromPipe(int fd);
+
+#define messageStartsWith(msg, prefix) \
+	(strncmp(msg, prefix, strlen(prefix)) == 0)
+#define messageEquals(msg, pattern) \
+	(strcmp(msg, pattern) == 0)
+
+static ParallelSlot *
+GetMyPSlot(ParallelState *pstate)
+{
+	int i;
+
+	for (i = 0; i < pstate->numWorkers; i++)
+#ifdef WIN32
+		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
+#else
+		if (pstate->parallelSlot[i].pid == getpid())
+#endif
+			return &(pstate->parallelSlot[i]);
+
+	return NULL;
+}
+
+/*
+ * This is the function that will be called from exit_horribly() to print the
+ * error message. If the worker process does exit_horribly(), we forward its
+ * last words to the master process. The master process then does
+ * exit_horribly() with this error message itself and prints it normally.
+ * After printing the message, exit_horribly() on the master will shut down
+ * the remaining worker processes.
+ */
+static void
+parallel_exit_msg_func(const char *modulename, const char *fmt, va_list ap)
+{
+	ParallelState *pstate = shutdown_info.pstate;
+	ParallelSlot *slot;
+
+	Assert(pstate);
+
+	slot = GetMyPSlot(pstate);
+
+	if (!slot)
+		/* We're the parent, just write the message out */
+		vwrite_msg(modulename, fmt, ap);
+	else
+		/* If we're a worker process, send the msg to the master process */
+		parallel_msg_master(slot, modulename, fmt, ap);
+}
+
+/* Sends the error message from the worker to the master process */
+static void
+parallel_msg_master(ParallelSlot *slot, const char *modulename,
+					const char *fmt, va_list ap)
+{
+	char		buf[512];
+	int			pipefd[2];
+
+	pipefd[PIPE_READ] = slot->pipeRevRead;
+	pipefd[PIPE_WRITE] = slot->pipeRevWrite;
+
+	strcpy(buf, "ERROR ");
+	vsnprintf(buf + strlen("ERROR "),
+			  sizeof(buf) - strlen("ERROR "), fmt, ap);
+
+	sendMessageToMaster(pipefd, buf);
+}
+
+/*
+ * pg_dump and pg_restore register the Archive pointer for the exit handler
+ * (called from exit_horribly). This function mainly exists so that we can
+ * keep shutdown_info in file scope only.
+ */
+void
+on_exit_close_archive(Archive *AHX)
+{
+	shutdown_info.AHX = AHX;
+	on_exit_nicely(archive_close_connection, &shutdown_info);
+}
+
+/*
+ * This function can close archives in both the parallel and non-parallel
+ * case.
+ */
+static void
+archive_close_connection(int code, void *arg)
+{
+	ShutdownInformation *si = (ShutdownInformation *) arg;
+
+	if (si->pstate)
+	{
+		ParallelSlot *slot = GetMyPSlot(si->pstate);
+
+		if (!slot) {
+			/*
+			 * We're the master: We have already printed out the message
+			 * passed to exit_horribly() either from the master itself or from
+			 * a worker process. Now we need to close our own database
+			 * connection (only open during parallel dump but not restore) and
+			 * shut down the remaining workers.
+			 */
+			DisconnectDatabase(si->AHX);
+#ifndef WIN32
+			/*
+			 * Setting aborting to true switches to best-effort-mode
+			 * (send/receive but ignore errors) in communicating with our
+			 * workers.
+			 */
+			aborting = true;
+#endif
+			ShutdownWorkersHard(si->pstate);
+		}
+		else if (slot->args->AH)
+			DisconnectDatabase(&(slot->args->AH->public));
+	}
+	else if (si->AHX)
+		DisconnectDatabase(si->AHX);
+}
+
+/*
+ * If we have one worker that terminates for some reason, we'd like the other
+ * threads to terminate as well (and not finish with their 70 GB table dump
+ * first...). Now in UNIX we can just kill these processes, and let the signal
+ * handler set wantAbort to 1. In Windows we set a termEvent and this serves
+ * as the signal for everyone to terminate.
+ */
+void
+checkAborting(ArchiveHandle *AH)
+{
+#ifdef WIN32
+	if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
+#else
+	if (wantAbort)
+#endif
+		exit_horribly(modulename, "worker is terminating\n");
+}
+
+/*
+ * Shut down any remaining workers, this has an implicit do_wait == true.
+ *
+ * The fastest way we can make the workers terminate gracefully is when
+ * they are listening for new commands and we just tell them to terminate.
+ */
+static void
+ShutdownWorkersHard(ParallelState *pstate)
+{
+#ifndef WIN32
+	int i;
+	signal(SIGPIPE, SIG_IGN);
+
+	/*
+	 * Close our write end of the sockets so that the workers know they can
+	 * exit.
+	 */
+	for (i = 0; i < pstate->numWorkers; i++)
+		closesocket(pstate->parallelSlot[i].pipeWrite);
+
+	for (i = 0; i < pstate->numWorkers; i++)
+		kill(pstate->parallelSlot[i].pid, SIGTERM);
+
+#else
+	/* The workers monitor this event via checkAborting(). */
+	SetEvent(termEvent);
+#endif
+
+	WaitForTerminatingWorkers(pstate);
+}
+
+/*
+ * Wait for the termination of the processes using the OS-specific method.
+ */
+static void
+WaitForTerminatingWorkers(ParallelState *pstate)
+{
+	while (!HasEveryWorkerTerminated(pstate))
+	{
+		ParallelSlot *slot = NULL;
+		int j;
+#ifndef WIN32
+		int		status;
+		pid_t	pid = wait(&status);
+		for (j = 0; j < pstate->numWorkers; j++)
+			if (pstate->parallelSlot[j].pid == pid)
+				slot = &(pstate->parallelSlot[j]);
+#else
+		uintptr_t hThread;
+		DWORD	ret;
+		uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
+		int nrun = 0;
+		for (j = 0; j < pstate->numWorkers; j++)
+			if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+			{
+				lpHandles[nrun] = pstate->parallelSlot[j].hThread;
+				nrun++;
+			}
+		ret = WaitForMultipleObjects(nrun, (HANDLE*) lpHandles, false, INFINITE);
+		Assert(ret != WAIT_FAILED);
+		hThread = lpHandles[ret - WAIT_OBJECT_0];
+
+		for (j = 0; j < pstate->numWorkers; j++)
+			if (pstate->parallelSlot[j].hThread == hThread)
+				slot = &(pstate->parallelSlot[j]);
+
+		free(lpHandles);
+#endif
+		Assert(slot);
+
+		slot->workerStatus = WRKR_TERMINATED;
+	}
+	Assert(HasEveryWorkerTerminated(pstate));
+}
+
+#ifndef WIN32
+/* Signal handling (UNIX only) */
+static void
+sigTermHandler(int signum)
+{
+	wantAbort = 1;
+}
+#endif
+
+/*
+ * This function is called by both UNIX and Windows variants to set up a
+ * worker process.
+ */
+static void
+SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+			RestoreOptions *ropt)
+{
+	/*
+	 * In dump mode (pg_dump) this calls _SetupWorker() as defined in
+	 * pg_dump.c, while in restore mode (pg_restore) it calls _SetupWorker()
+	 * as defined in pg_restore.c.
+     *
+	 * We get the raw connection only for the reason that we can close it
+	 * properly when we shut down. This happens only that way when it is
+	 * brought down because of an error.
+	 */
+	_SetupWorker((Archive *) AH, ropt);
+
+	Assert(AH->connection != NULL);
+
+	WaitForCommands(AH, pipefd);
+
+	closesocket(pipefd[PIPE_READ]);
+	closesocket(pipefd[PIPE_WRITE]);
+}
+
+#ifdef WIN32
+/*
+ * On Windows the _beginthreadex() function allows us to pass one parameter.
+ * Since we need to pass a few values however, we define a structure here
+ * and then pass a pointer to such a structure in _beginthreadex().
+ */
+typedef struct {
+	ArchiveHandle  *AH;
+	RestoreOptions *ropt;
+	int				worker;
+	int				pipeRead;
+	int				pipeWrite;
+} WorkerInfo;
+
+static unsigned __stdcall
+init_spawned_worker_win32(WorkerInfo *wi)
+{
+	ArchiveHandle *AH;
+	int pipefd[2] = { wi->pipeRead, wi->pipeWrite };
+	int worker = wi->worker;
+	RestoreOptions *ropt = wi->ropt;
+
+	AH = CloneArchive(wi->AH);
+
+	free(wi);
+	SetupWorker(AH, pipefd, worker, ropt);
+
+	DeCloneArchive(AH);
+	_endthreadex(0);
+	return 0;
+}
+#endif
+
+/*
+ * This function starts the parallel dump or restore by spawning off the
+ * worker processes in both Unix and Windows. For Windows, it creates a number
+ * of threads while it does a fork() on Unix.
+ */
+ParallelState *
+ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
+{
+	ParallelState  *pstate;
+	int				i;
+	const size_t	slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
+
+	Assert(AH->public.numWorkers > 0);
+
+	/* Ensure stdio state is quiesced before forking */
+	fflush(NULL);
+
+	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
+
+	pstate->numWorkers = AH->public.numWorkers;
+	pstate->parallelSlot = NULL;
+
+	if (AH->public.numWorkers == 1)
+		return pstate;
+
+	pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
+	memset((void *) pstate->parallelSlot, 0, slotSize);
+
+	/*
+	 * Set the pstate in the shutdown_info. The exit handler uses pstate if
+	 * set and falls back to AHX otherwise.
+	 */
+	shutdown_info.pstate = pstate;
+	on_exit_msg_func = parallel_exit_msg_func;
+
+#ifdef WIN32
+	tMasterThreadId = GetCurrentThreadId();
+	termEvent = CreateEvent(NULL, true, false, "Terminate");
+#else
+	signal(SIGTERM, sigTermHandler);
+	signal(SIGINT, sigTermHandler);
+	signal(SIGQUIT, sigTermHandler);
+#endif
+
+	for (i = 0; i < pstate->numWorkers; i++)
+	{
+#ifdef WIN32
+		WorkerInfo *wi;
+		uintptr_t	handle;
+#else
+		pid_t		pid;
+#endif
+		int			pipeMW[2], pipeWM[2];
+
+		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
+			exit_horribly(modulename,
+						  "Cannot create communication channels: %s\n",
+						  strerror(errno));
+
+		pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+		pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
+		pstate->parallelSlot[i].args->AH = NULL;
+		pstate->parallelSlot[i].args->te = NULL;
+#ifdef WIN32
+		/* Allocate a new structure for every worker */
+		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
+
+		wi->ropt = ropt;
+		wi->worker = i;
+		wi->AH = AH;
+		wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+		wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+
+		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
+								wi, 0, &(pstate->parallelSlot[i].threadId));
+		pstate->parallelSlot[i].hThread = handle;
+#else
+		pid = fork();
+		if (pid == 0)
+		{
+			/* we are the worker */
+			int j;
+			int pipefd[2] = { pipeMW[PIPE_READ], pipeWM[PIPE_WRITE] };
+
+			/*
+			 * Store the fds for the reverse communication in pstate. Actually
+			 * we only use this in case of an error and don't use pstate
+			 * otherwise in the worker process. On Windows we write to the
+			 * global pstate, in Unix we write to our process-local copy but
+			 * that's also where we'd retrieve this information back from.
+			 */
+			pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
+			pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
+			pstate->parallelSlot[i].pid = getpid();
+
+			/*
+			 * Call CloneArchive on Unix as well even though technically we
+			 * don't need to because fork() gives us a copy in our own address
+			 * space already. But CloneArchive resets the state information
+			 * and also clones the database connection (for parallel dump)
+			 * which both seem kinda helpful.
+			 */
+			pstate->parallelSlot[i].args->AH = CloneArchive(AH);
+
+			/* close read end of Worker -> Master */
+			closesocket(pipeWM[PIPE_READ]);
+			/* close write end of Master -> Worker */
+			closesocket(pipeMW[PIPE_WRITE]);
+
+			/*
+			 * Close all inherited fds for communication of the master with
+			 * the other workers.
+			 */
+			for (j = 0; j < i; j++)
+			{
+				closesocket(pstate->parallelSlot[j].pipeRead);
+				closesocket(pstate->parallelSlot[j].pipeWrite);
+			}
+
+			SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);
+
+			exit(0);
+		}
+		else if (pid < 0)
+			/* fork failed */
+			exit_horribly(modulename,
+						  "could not create worker process: %s\n",
+						  strerror(errno));
+
+		/* we are the Master, pid > 0 here */
+		Assert(pid > 0);
+
+		/* close read end of Master -> Worker */
+		closesocket(pipeMW[PIPE_READ]);
+		/* close write end of Worker -> Master */
+		closesocket(pipeWM[PIPE_WRITE]);
+
+		pstate->parallelSlot[i].pid = pid;
+#endif
+
+		pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+		pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+	}
+
+	return pstate;
+}
+
+/*
+ * Tell all of our workers to terminate.
+ *
+ * Pretty straightforward routine, first we tell everyone to terminate, then
+ * we listen to the workers' replies and finally close the sockets that we
+ * have used for communication.
+ */
+void
+ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
+{
+	int i;
+
+	if (pstate->numWorkers == 1)
+		return;
+
+	Assert(IsEveryWorkerIdle(pstate));
+
+	/* close the sockets so that the workers know they can exit */
+	for (i = 0; i < pstate->numWorkers; i++)
+	{
+		closesocket(pstate->parallelSlot[i].pipeRead);
+		closesocket(pstate->parallelSlot[i].pipeWrite);
+	}
+	WaitForTerminatingWorkers(pstate);
+
+	/*
+	 * Remove the pstate again, so the exit handler in the parent will now
+	 * again fall back to closing AH->connection (if connected).
+	 */
+	shutdown_info.pstate = NULL;
+
+	free(pstate->parallelSlot);
+	free(pstate);
+}
+
+
+/*
+ * The sequence is the following (for dump, similar for restore):
+ *
+ * The master process starts the parallel backup in ParllelBackupStart, this
+ * forks the worker processes which enter WaitForCommand().
+ *
+ * The master process dispatches an individual work item to one of the worker
+ * processes in DispatchJobForTocEntry(). It calls
+ * AH->MasterStartParallelItemPtr, a routine of the output format. This
+ * function's arguments are the parents archive handle AH (containing the full
+ * catalog information), the TocEntry that the worker should work on and a
+ * T_Action act indicating whether this is a backup or a restore item.  The
+ * function then converts the TocEntry assignment into a string that is then
+ * sent over to the worker process. In the simplest case that would be
+ * something like "DUMP 1234", with 1234 being the TocEntry id.
+ *
+ * The worker receives the message in the routine pointed to by
+ * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to
+ * corresponding routines of the respective output format, e.g.
+ * _WorkerJobDumpDirectory().
+ *
+ * Remember that we have forked off the workers only after we have read in the
+ * catalog. That's why our worker processes can also access the catalog
+ * information. Now they re-translate the textual representation to a TocEntry
+ * on their side and do the required action (restore or dump).
+ *
+ * The result is again a textual string that is sent back to the master and is
+ * interpreted by AH->MasterEndParallelItemPtr. This function can update state
+ * or catalog information on the master's side, depending on the reply from
+ * the worker process. In the end it returns status which is 0 for successful
+ * execution.
+ *
+ * ---------------------------------------------------------------------
+ * Master                                   Worker
+ *
+ *                                          enters WaitForCommands()
+ * DispatchJobForTocEntry(...te...)
+ *
+ * [ Worker is IDLE ]
+ *
+ * arg = (MasterStartParallelItemPtr)()
+ * send: DUMP arg
+ *                                          receive: DUMP arg
+ *                                          str = (WorkerJobDumpPtr)(arg)
+ * [ Worker is WORKING ]                    ... gets te from arg ...
+ *                                          ... dump te ...
+ *                                          send: OK DUMP info
+ *
+ * In ListenToWorkers():
+ *
+ * [ Worker is FINISHED ]
+ * receive: OK DUMP info
+ * status = (MasterEndParallelItemPtr)(info)
+ *
+ * In ReapWorkerStatus(&ptr):
+ * *ptr = status;
+ * [ Worker is IDLE ]
+ * ---------------------------------------------------------------------
+ */
+void
+DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
+					   T_Action act)
+{
+	int		worker;
+	char   *arg;
+
+	/* our caller makes sure that at least one worker is idle */
+	Assert(GetIdleWorker(pstate) != NO_SLOT);
+	worker = GetIdleWorker(pstate);
+	Assert(worker != NO_SLOT);
+
+	arg = (AH->MasterStartParallelItemPtr)(AH, te, act);
+
+	sendMessageToWorker(pstate, worker, arg);
+
+	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
+	pstate->parallelSlot[worker].args->te = te;
+}
+
+/*
+ * Find the first free parallel slot (if any).
+ */
+int
+GetIdleWorker(ParallelState *pstate)
+{
+	int			i;
+	for (i = 0; i < pstate->numWorkers; i++)
+		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
+			return i;
+	return NO_SLOT;
+}
+
+/*
+ * Return true iff every worker process is in the WRKR_TERMINATED state.
+ */
+static bool
+HasEveryWorkerTerminated(ParallelState *pstate)
+{
+	int			i;
+	for (i = 0; i < pstate->numWorkers; i++)
+		if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+			return false;
+	return true;
+}
+
+/*
+ * Return true iff every worker is in the WRKR_IDLE state.
+ */
+bool
+IsEveryWorkerIdle(ParallelState *pstate)
+{
+	int			i;
+	for (i = 0; i < pstate->numWorkers; i++)
+		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
+			return false;
+	return true;
+}
+
+/*
+ * ---------------------------------------------------------------------
+ * One danger of the parallel backup is a possible deadlock:
+ *
+ * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
+ * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
+ *    because the master holds a conflicting ACCESS SHARE lock).
+ * 3) The worker process also requests an ACCESS SHARE lock to read the table.
+ *    The worker's not granted that lock but is enqueued behind the ACCESS
+ *    EXCLUSIVE lock request.
+ * ---------------------------------------------------------------------
+ *
+ * Now what we do here is to just request a lock in ACCESS SHARE but with
+ * NOWAIT in the worker prior to touching the table. If we don't get the lock,
+ * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
+ * are good to just fail the whole backup because we have detected a deadlock.
+ */
+static void
+lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
+{
+	Archive *AHX = (Archive *) AH;
+	const char *qualId;
+	PQExpBuffer query = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(AH->format == archDirectory);
+	Assert(strcmp(te->desc, "BLOBS") != 0);
+
+	appendPQExpBuffer(query,
+					  "SELECT pg_namespace.nspname,"
+					  "       pg_class.relname "
+					  "  FROM pg_class "
+					  "  JOIN pg_namespace on pg_namespace.oid = relnamespace "
+					  " WHERE pg_class.oid = %d", te->catalogId.oid);
+
+	res = PQexec(AH->connection, query->data);
+
+	if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+		exit_horribly(modulename,
+					  "could not get relation name for oid %d: %s\n",
+					  te->catalogId.oid, PQerrorMessage(AH->connection));
+
+	resetPQExpBuffer(query);
+
+	qualId = fmtQualifiedId(AHX->remoteVersion,
+							PQgetvalue(res, 0, 0),
+							PQgetvalue(res, 0, 1));
+
+	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
+					  qualId);
+	PQclear(res);
+
+	res = PQexec(AH->connection, query->data);
+
+	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+		exit_horribly(modulename,
+					  "could not obtain lock on relation \"%s\". This "
+					  "usually means that someone requested an ACCESS EXCLUSIVE lock "
+					  "on the table after the pg_dump parent process has gotten the "
+					  "initial ACCESS SHARE lock on the table.\n", qualId);
+
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * That's the main routine for the worker.
+ * When it starts up it enters this routine and waits for commands from the
+ * master process. After having processed a command it comes back to here to
+ * wait for the next command. Finally it will receive a TERMINATE command and
+ * exit.
+ */
+static void
+WaitForCommands(ArchiveHandle *AH, int pipefd[2])
+{
+	char	   *command;
+	DumpId		dumpId;
+	int			nBytes;
+	char	   *str = NULL;
+	TocEntry   *te;
+
+	for(;;)
+	{
+		if (!(command = getMessageFromMaster(pipefd)))
+		{
+			PQfinish(AH->connection);
+			AH->connection = NULL;
+			return;
+		}
+
+		if (messageStartsWith(command, "DUMP "))
+		{
+			Assert(AH->format == archDirectory);
+			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
+			Assert(nBytes == strlen(command) - strlen("DUMP "));
+
+			te = getTocEntryByDumpId(AH, dumpId);
+			Assert(te != NULL);
+
+			/*
+			 * Lock the table but with NOWAIT. Note that the parent is already
+			 * holding a lock. If we cannot acquire another ACCESS SHARE MODE
+			 * lock, then somebody else has requested an exclusive lock in the
+			 * meantime.  lockTableNoWait dies in this case to prevent a
+			 * deadlock.
+			 */
+			if (strcmp(te->desc, "BLOBS") != 0)
+				lockTableNoWait(AH, te);
+
+			/*
+			 * The message we return here has been pg_malloc()ed and we are
+			 * responsible for free()ing it.
+			 */
+			str = (AH->WorkerJobDumpPtr)(AH, te);
+			Assert(AH->connection != NULL);
+			sendMessageToMaster(pipefd, str);
+			free(str);
+		}
+		else if (messageStartsWith(command, "RESTORE "))
+		{
+			Assert(AH->format == archDirectory || AH->format == archCustom);
+			Assert(AH->connection != NULL);
+
+			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
+			Assert(nBytes == strlen(command) - strlen("RESTORE "));
+
+			te = getTocEntryByDumpId(AH, dumpId);
+			Assert(te != NULL);
+			/*
+			 * The message we return here has been pg_malloc()ed and we are
+			 * responsible for free()ing it.
+			 */
+			str = (AH->WorkerJobRestorePtr)(AH, te);
+			Assert(AH->connection != NULL);
+			sendMessageToMaster(pipefd, str);
+			free(str);
+		}
+		else
+			exit_horribly(modulename,
+						  "Unknown command on communication channel: %s\n",
+						  command);
+	}
+}
+
+/*
+ * ---------------------------------------------------------------------
+ * Note the status change:
+ *
+ * DispatchJobForTocEntry		WRKR_IDLE -> WRKR_WORKING
+ * ListenToWorkers				WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
+ * ReapWorkerStatus				WRKR_FINISHED -> WRKR_IDLE
+ * ---------------------------------------------------------------------
+ *
+ * Just calling ReapWorkerStatus() when all workers are working might or might
+ * not give you an idle worker because you need to call ListenToWorkers() in
+ * between and only thereafter ReapWorkerStatus(). This is necessary in order
+ * to get and deal with the status (=result) of the worker's execution.
+ */
+void
+ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
+{
+	int			worker;
+	char	   *msg;
+
+	msg = getMessageFromWorker(pstate, do_wait, &worker);
+
+	if (!msg)
+	{
+		if (do_wait)
+			exit_horribly(modulename, "A worker process died unexpectedly\n");
+		return;
+	}
+
+	if (messageStartsWith(msg, "OK "))
+	{
+		char	   *statusString;
+		TocEntry   *te;
+
+		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
+		te = pstate->parallelSlot[worker].args->te;
+		if (messageStartsWith(msg, "OK RESTORE "))
+		{
+			statusString = msg + strlen("OK RESTORE ");
+			pstate->parallelSlot[worker].status =
+				(AH->MasterEndParallelItemPtr)
+					(AH, te, statusString, ACT_RESTORE);
+		}
+		else if (messageStartsWith(msg, "OK DUMP "))
+		{
+			statusString = msg + strlen("OK DUMP ");
+			pstate->parallelSlot[worker].status =
+				(AH->MasterEndParallelItemPtr)
+					(AH, te, statusString, ACT_DUMP);
+		}
+		else
+			exit_horribly(modulename,
+						  "Invalid message received from worker: %s\n", msg);
+	}
+	else if (messageStartsWith(msg, "ERROR "))
+	{
+		Assert(AH->format == archDirectory || AH->format == archCustom);
+		pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
+		exit_horribly(modulename, "%s", msg + strlen("ERROR "));
+	}
+	else
+		exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);
+
+	/* both Unix and Win32 return pg_malloc()ed space, so we free it */
+	free(msg);
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * This function is used to get the return value of a terminated worker
+ * process. If a process has terminated, its status is stored in *status and
+ * the id of the worker is returned.
+ */
+int
+ReapWorkerStatus(ParallelState *pstate, int *status)
+{
+	int			i;
+
+	for (i = 0; i < pstate->numWorkers; i++)
+	{
+		if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
+		{
+			*status = pstate->parallelSlot[i].status;
+			pstate->parallelSlot[i].status = 0;
+			pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+			return i;
+		}
+	}
+	return NO_SLOT;
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * It looks for an idle worker process and only returns if there is one.
+ */
+void
+EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
+{
+	int		ret_worker;
+	int		work_status;
+
+	for (;;)
+	{
+		int nTerm = 0;
+		while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+		{
+			if (work_status != 0)
+				exit_horribly(modulename, "Error processing a parallel work item.\n");
+
+			nTerm++;
+		}
+
+		/*
+		 * We need to make sure that we have an idle worker before dispatching
+		 * the next item. If nTerm > 0 we already have that (quick check).
+		 */
+		if (nTerm > 0)
+			return;
+
+		/* explicit check for an idle worker */
+		if (GetIdleWorker(pstate) != NO_SLOT)
+			return;
+
+		/*
+		 * If we have no idle worker, read the result of one or more
+		 * workers and loop the loop to call ReapWorkerStatus() on them
+		 */
+		ListenToWorkers(AH, pstate, true);
+	}
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * It waits for all workers to terminate.
+ */
+void
+EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
+{
+	int			work_status;
+
+	if (!pstate || pstate->numWorkers == 1)
+		return;
+
+	/* Waiting for the remaining worker processes to finish */
+	while (!IsEveryWorkerIdle(pstate))
+	{
+		if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
+			ListenToWorkers(AH, pstate, true);
+		else if (work_status != 0)
+			exit_horribly(modulename,
+						  "Error processing a parallel work item\n");
+	}
+}
+
+/*
+ * This function is executed in the worker process.
+ *
+ * It returns the next message on the communication channel, blocking until it
+ * becomes available.
+ */
+static char *
+getMessageFromMaster(int pipefd[2])
+{
+	return readMessageFromPipe(pipefd[PIPE_READ]);
+}
+
+/*
+ * This function is executed in the worker process.
+ *
+ * It sends a message to the master on the communication channel.
+ */
+static void
+sendMessageToMaster(int pipefd[2], const char *str)
+{
+	int			len = strlen(str) + 1;
+
+	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
+		exit_horribly(modulename,
+					  "Error writing to the communication channel: %s\n",
+					  strerror(errno));
+}
+
+/*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+#ifdef WIN32
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+	int			i;
+	fd_set		saveSet = *workerset;
+
+	/* should always be the master */
+	Assert(tMasterThreadId == GetCurrentThreadId());
+
+	for (;;)
+	{
+		/*
+		 * sleep a quarter of a second before checking if we should
+		 * terminate.
+		 */
+		struct timeval tv = { 0, 250000 };
+		*workerset = saveSet;
+		i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+
+		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+			continue;
+		if (i)
+			break;
+	}
+
+	return i;
+}
+#else /* UNIX */
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+	int		i;
+
+	fd_set saveSet = *workerset;
+	for (;;)
+	{
+		*workerset = saveSet;
+		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+
+		/*
+		 * If we Ctrl-C the master process , it's likely that we interrupt
+		 * select() here. The signal handler will set wantAbort == true and
+		 * the shutdown journey starts from here. Note that we'll come back
+		 * here later when we tell all workers to terminate and read their
+		 * responses. But then we have aborting set to true.
+		 */
+		if (wantAbort && !aborting)
+			exit_horribly(modulename, "terminated by user\n");
+
+		if (i < 0 && errno == EINTR)
+			continue;
+		break;
+	}
+
+	return i;
+}
+#endif
+
+/*
+ * This function is executed in the master process.
+ *
+ * It returns the next message from the worker on the communication channel,
+ * optionally blocking (do_wait) until it becomes available.
+ *
+ * The id of the worker is returned in *worker.
+ */
+static char *
+getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
+{
+	int			i;
+	fd_set		workerset;
+	int			maxFd = -1;
+	struct		timeval nowait = { 0, 0 };
+
+	FD_ZERO(&workerset);
+
+	for (i = 0; i < pstate->numWorkers; i++)
+	{
+		if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+			continue;
+		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
+		/* actually WIN32 ignores the first parameter to select()... */
+		if (pstate->parallelSlot[i].pipeRead > maxFd)
+			maxFd = pstate->parallelSlot[i].pipeRead;
+	}
+
+	if (do_wait)
+	{
+		i = select_loop(maxFd, &workerset);
+		Assert(i != 0);
+	}
+	else
+	{
+		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
+			return NULL;
+	}
+
+	if (i < 0)
+		exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno));
+
+	for (i = 0; i < pstate->numWorkers; i++)
+	{
+		char	   *msg;
+
+		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
+			continue;
+
+		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
+		*worker = i;
+		return msg;
+	}
+	Assert(false);
+	return NULL;
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * It sends a message to a certain worker on the communication channel.
+ */
+static void
+sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
+{
+	int			len = strlen(str) + 1;
+
+	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
+	{
+		/*
+		 * If we're already aborting anyway, don't care if we succeed or not.
+		 * The child might have gone already.
+		 */
+#ifndef WIN32
+		if (!aborting)
+#endif
+			exit_horribly(modulename,
+						  "Error writing to the communication channel: %s\n",
+						  strerror(errno));
+	}
+}
+
+/*
+ * The underlying function to read a message from the communication channel
+ * (fd) with optional blocking (do_wait).
+ */
+static char *
+readMessageFromPipe(int fd)
+{
+	char	   *msg;
+	int			msgsize, bufsize;
+	int			ret;
+
+	/*
+	 * The problem here is that we need to deal with several possibilites:
+	 * we could receive only a partial message or several messages at once.
+	 * The caller expects us to return exactly one message however.
+	 *
+	 * We could either read in as much as we can and keep track of what we
+	 * delivered back to the caller or we just read byte by byte. Once we see
+	 * (char) 0, we know that it's the message's end. This would be quite
+	 * inefficient for more data but since we are reading only on the command
+	 * channel, the performance loss does not seem worth the trouble of
+	 * keeping internal states for different file descriptors.
+	 */
+	bufsize = 64;  /* could be any number */
+	msg = (char *) pg_malloc(bufsize);
+
+	msgsize = 0;
+	for (;;)
+	{
+		Assert(msgsize <= bufsize);
+		ret = piperead(fd, msg + msgsize, 1);
+
+		/* worker has closed the connection or another error happened */
+		if (ret <= 0)
+			return NULL;
+
+		Assert(ret == 1);
+
+		if (msg[msgsize] == '\0')
+			return msg;
+
+		msgsize++;
+		if (msgsize == bufsize)
+		{
+			/* could be any number */
+			bufsize += 16;
+			msg = (char *) realloc(msg, bufsize);
+		}
+	}
+}
+
+#ifdef WIN32
+/*
+ * This is a replacement version of pipe for Win32 which allows returned
+ * handles to be used in select(). Note that read/write calls must be replaced
+ * with recv/send.
+ */
+static int
+pgpipe(int handles[2])
+{
+	SOCKET		s;
+	struct sockaddr_in serv_addr;
+	int			len = sizeof(serv_addr);
+
+	handles[0] = handles[1] = INVALID_SOCKET;
+
+	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+	{
+		write_msg(modulename, "pgpipe could not create socket: %ui",
+				  WSAGetLastError());
+		return -1;
+	}
+
+	memset((void *) &serv_addr, 0, sizeof(serv_addr));
+	serv_addr.sin_family = AF_INET;
+	serv_addr.sin_port = htons(0);
+	serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
+	{
+		write_msg(modulename, "pgpipe could not bind: %ui",
+				  WSAGetLastError());
+		closesocket(s);
+		return -1;
+	}
+	if (listen(s, 1) == SOCKET_ERROR)
+	{
+		write_msg(modulename, "pgpipe could not listen: %ui",
+				  WSAGetLastError());
+		closesocket(s);
+		return -1;
+	}
+	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
+	{
+		write_msg(modulename, "pgpipe could not getsockname: %ui",
+				  WSAGetLastError());
+		closesocket(s);
+		return -1;
+	}
+	if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+	{
+		write_msg(modulename, "pgpipe could not create socket 2: %ui",
+				  WSAGetLastError());
+		closesocket(s);
+		return -1;
+	}
+
+	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
+	{
+		write_msg(modulename, "pgpipe could not connect socket: %ui",
+				  WSAGetLastError());
+		closesocket(s);
+		return -1;
+	}
+	if ((handles[0] = accept(s, (SOCKADDR *) &serv_addr, &len)) == INVALID_SOCKET)
+	{
+		write_msg(modulename, "pgpipe could not accept socket: %ui",
+				  WSAGetLastError());
+		closesocket(handles[1]);
+		handles[1] = INVALID_SOCKET;
+		closesocket(s);
+		return -1;
+	}
+	closesocket(s);
+	return 0;
+}
+
+static int
+piperead(int s, char *buf, int len)
+{
+	int			ret = recv(s, buf, len, 0);
+
+	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
+		/* EOF on the pipe! (win32 socket based implementation) */
+		ret = 0;
+	return ret;
+}
+#endif
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
new file mode 100644
index 0000000..3eafe2f
--- /dev/null
+++ b/src/bin/pg_dump/parallel.h
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.h
+ *
+ *	Parallel support header file for the pg_dump archiver
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	The author is not responsible for loss or damages that may
+ *	result from its use.
+ *
+ * IDENTIFICATION
+ *		src/bin/pg_dump/parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "pg_backup_db.h"
+
+struct _archiveHandle;
+struct _tocEntry;
+
+typedef enum
+{
+	WRKR_TERMINATED = 0,
+	WRKR_IDLE,
+	WRKR_WORKING,
+	WRKR_FINISHED
+} T_WorkerStatus;
+
+typedef enum T_Action
+{
+	ACT_DUMP,
+	ACT_RESTORE,
+} T_Action;
+
+/* Arguments needed for a worker process */
+typedef struct ParallelArgs
+{
+	struct _archiveHandle *AH;
+	struct _tocEntry	  *te;
+} ParallelArgs;
+
+/* State for each parallel activity slot */
+typedef struct ParallelSlot
+{
+	ParallelArgs	   *args;
+	T_WorkerStatus		workerStatus;
+	int					status;
+	int					pipeRead;
+	int					pipeWrite;
+	int					pipeRevRead;
+	int					pipeRevWrite;
+#ifdef WIN32
+	uintptr_t			hThread;
+	unsigned int		threadId;
+#else
+	pid_t				pid;
+#endif
+} ParallelSlot;
+
+#define NO_SLOT (-1)
+
+typedef struct ParallelState
+{
+	int			numWorkers;
+	ParallelSlot *parallelSlot;
+} ParallelState;
+
+extern int GetIdleWorker(ParallelState *pstate);
+extern bool IsEveryWorkerIdle(ParallelState *pstate);
+extern void ListenToWorkers(struct _archiveHandle *AH, ParallelState *pstate, bool do_wait);
+extern int ReapWorkerStatus(ParallelState *pstate, int *status);
+extern void EnsureIdleWorker(struct _archiveHandle *AH, ParallelState *pstate);
+extern void EnsureWorkersFinished(struct _archiveHandle *AH, ParallelState *pstate);
+
+extern ParallelState *ParallelBackupStart(struct _archiveHandle *AH,
+										  RestoreOptions *ropt);
+extern void DispatchJobForTocEntry(struct _archiveHandle *AH,
+								   ParallelState *pstate,
+								   struct _tocEntry *te, T_Action act);
+extern void ParallelBackupEnd(struct _archiveHandle *AH, ParallelState *pstate);
+
+extern void checkAborting(struct _archiveHandle *AH);
+
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to