On 2017-06-06 12:53:21 -0700, Andres Freund wrote:
> On 2017-06-06 15:48:42 -0400, Robert Haas wrote:
> > On Fri, Jun 2, 2017 at 3:24 PM, Andres Freund <and...@anarazel.de> wrote:
> > > Latches work in single user mode, it's just that the new code for some
> > > reason uses uninitialized memory as the latch.  As I pointed out above,
> > > the new code really should just use MyLatch instead of
> > > MyProc->procLatch.
> 
> FWIW, I'd misremembered some code here, and we actually reach the
> function initializing the shared latch, even in single user mode.
> 
> 
> > We seem to have accumulated quite a few instance of that.
> > 
> > [rhaas pgsql]$ git grep MyLatch | wc -l
> >      116
> > [rhaas pgsql]$ git grep 'MyProc->procLatch' | wc -l
> >       33
> > 
> > Most of the offenders are in src/backend/replication, but there are
> > some that are related to parallelism as well (bgworker.c, pqmq.c,
> > parallel.c, condition_variable.c).  Maybe we (you?) should just go and
> > change them all.  I don't think using MyLatch instead of
> > MyProc->procLatch has become automatic for everyone yet.
> 
> Nevertheless this should be changed.  Will do.

Here's the patch for that, also addressing some issues I found while
updating those callsites (separate thread started, too).

- Andres
>From 9206ced1dc05d3a9cc99faafa22d5d8b16d998d1 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Tue, 6 Jun 2017 16:13:00 -0700
Subject: [PATCH] Clean up latch related code.

The larger part of this patch replaces usages of MyProc->procLatch
with MyLatch.  The latter works even early during backend startup,
where MyProc->procLatch doesn't yet.  While the affected code
shouldn't run in cases where it's not initialized, it might get copied
into places where it might.  Using MyLatch is simpler and a bit faster
to boot, so there's little point to stick with the previous coding.

While doing so I noticed some weaknesses around newly introduced uses
of latches that could lead to missed events, and an omitted
CHECK_FOR_INTERRUPTS() call in worker_spi.

As all the actual bugs are in v10 code, there doesn't seem to be
sufficient reason to backpatch this.

Author: Andres Freund
Discussion:
    https://postgr.es/m/20170606195321.sjmenrfgl2nu6...@alap3.anarazel.de
    https://postgr.es/m/20170606210405.sim3yl6vpudhm...@alap3.anarazel.de
Backpatch: -
---
 src/backend/access/transam/parallel.c              |  4 +--
 src/backend/libpq/pqmq.c                           |  4 +--
 src/backend/postmaster/bgworker.c                  |  4 +--
 .../libpqwalreceiver/libpqwalreceiver.c            | 13 ++++----
 src/backend/replication/logical/launcher.c         | 35 +++++++++++++++-------
 src/backend/replication/logical/tablesync.c        | 12 ++++----
 src/backend/replication/logical/worker.c           | 10 +++++--
 src/backend/storage/lmgr/condition_variable.c      |  6 ++--
 src/test/modules/worker_spi/worker_spi.c           |  2 ++
 9 files changed, 56 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index cb22174270..16a0bee610 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
 		if (!anyone_alive)
 			break;
 
-		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1,
+		WaitLatch(MyLatch, WL_LATCH_SET, -1,
 				  WAIT_EVENT_PARALLEL_FINISH);
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 	}
 
 	if (pcxt->toc != NULL)
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 96939327c3..8fbc03819d 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 		if (result != SHM_MQ_WOULD_BLOCK)
 			break;
 
-		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0,
+		WaitLatch(MyLatch, WL_LATCH_SET, 0,
 				  WAIT_EVENT_MQ_PUT_MESSAGE);
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 		CHECK_FOR_INTERRUPTS();
 	}
 
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index c3454276bf..712d700481 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
 		if (status == BGWH_STOPPED)
 			break;
 
-		rc = WaitLatch(&MyProc->procLatch,
+		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
 					   WAIT_EVENT_BGWORKER_SHUTDOWN);
 
@@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
 			break;
 		}
 
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 	}
 
 	return status;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 726d1b5bd8..89c34b8225 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 				   ? WL_SOCKET_READABLE
 				   : WL_SOCKET_WRITEABLE);
 
-		rc = WaitLatchOrSocket(&MyProc->procLatch,
+		rc = WaitLatchOrSocket(MyLatch,
 							   WL_POSTMASTER_DEATH |
 							   WL_LATCH_SET | io_flag,
 							   PQsocket(conn->streamConn),
@@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		/* Interrupted? */
 		if (rc & WL_LATCH_SET)
 		{
-			ResetLatch(&MyProc->procLatch);
+			ResetLatch(MyLatch);
 			CHECK_FOR_INTERRUPTS();
 		}
 
@@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 			 * the signal arrives in the middle of establishment of
 			 * replication connection.
 			 */
-			ResetLatch(&MyProc->procLatch);
-			rc = WaitLatchOrSocket(&MyProc->procLatch,
+			rc = WaitLatchOrSocket(MyLatch,
 								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
 								   WL_LATCH_SET,
 								   PQsocket(streamConn),
 								   0,
 								   WAIT_EVENT_LIBPQWALRECEIVER);
+
+			/* Emergency bailout? */
 			if (rc & WL_POSTMASTER_DEATH)
 				exit(1);
 
-			/* interrupted */
+			/* Interrupted? */
 			if (rc & WL_LATCH_SET)
 			{
+				ResetLatch(MyLatch);
 				CHECK_FOR_INTERRUPTS();
-				continue;
 			}
 			if (PQconsumeInput(streamConn) == 0)
 				return NULL;	/* trouble */
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 5aaf24bfe4..5a3274b2c2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
+		/* emergency bailout if postmaster has died */
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
-		ResetLatch(MyLatch);
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 
 	return;
@@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 		LWLockRelease(LogicalRepWorkerLock);
 
-		CHECK_FOR_INTERRUPTS();
-
 		/* Wait for signal. */
-		rc = WaitLatch(&MyProc->procLatch,
+		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
@@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
-		ResetLatch(&MyProc->procLatch);
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 
 		/* Check worker status. */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		CHECK_FOR_INTERRUPTS();
 
 		/* Wait for more work. */
-		rc = WaitLatch(&MyProc->procLatch,
+		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
 
@@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
-		ResetLatch(&MyProc->procLatch);
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 }
 
@@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg)
 		}
 
 		/* Wait for more work. */
-		rc = WaitLatch(&MyProc->procLatch,
+		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   wait_time,
 					   WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
@@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
 		if (got_SIGHUP)
 		{
 			got_SIGHUP = false;
 			ProcessConfigFile(PGC_SIGHUP);
 		}
-
-		ResetLatch(&MyProc->procLatch);
 	}
 
 	LogicalRepCtx->launcher_pid = 0;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index ed66602935..6e55d2d606 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 		if (!worker)
 			return false;
 
-		rc = WaitLatch(&MyProc->procLatch,
+		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
@@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 	}
 
 	return false;
@@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state)
 		if (MyLogicalRepWorker->relstate == expected_state)
 			return true;
 
-		rc = WaitLatch(&MyProc->procLatch,
+		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
@@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 	}
 
 	return false;
@@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		/*
 		 * Wait for more data or latch.
 		 */
-		rc = WaitLatchOrSocket(&MyProc->procLatch,
+		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
 							   WL_TIMEOUT | WL_POSTMASTER_DEATH,
 							   fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
@@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 	}
 
 	return bytesread;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 51a64487cd..999d627c87 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		/*
 		 * Wait for more data or latch.
 		 */
-		rc = WaitLatchOrSocket(&MyProc->procLatch,
+		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
 							   WL_TIMEOUT | WL_POSTMASTER_DEATH,
 							   fd, NAPTIME_PER_CYCLE,
@@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
 		if (got_SIGHUP)
 		{
 			got_SIGHUP = false;
@@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			send_feedback(last_received, requestReply, requestReply);
 		}
-
-		ResetLatch(&MyProc->procLatch);
 	}
 }
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 5afb21121b..b4b7d28dd5 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 	{
 		cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
 		AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
-						  &MyProc->procLatch, NULL);
+						  MyLatch, NULL);
 	}
 
 	/*
 	 * Reset my latch before adding myself to the queue and before entering
 	 * the caller's predicate loop.
 	 */
-	ResetLatch(&MyProc->procLatch);
+	ResetLatch(MyLatch);
 
 	/* Add myself to the wait queue. */
 	SpinLockAcquire(&cv->mutex);
@@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 		WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
 
 		/* Reset latch before testing whether we can return. */
-		ResetLatch(&MyProc->procLatch);
+		ResetLatch(MyLatch);
 
 		/*
 		 * If this process has been taken out of the wait list, then we know
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index 9abfc714a9..553baf0045 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg)
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
+		CHECK_FOR_INTERRUPTS();
+
 		/*
 		 * In case of a SIGHUP, just reload the configuration.
 		 */
-- 
2.12.0.264.gd6db3f2165.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to