On 2017-06-06 12:53:21 -0700, Andres Freund wrote: > On 2017-06-06 15:48:42 -0400, Robert Haas wrote: > > On Fri, Jun 2, 2017 at 3:24 PM, Andres Freund <and...@anarazel.de> wrote: > > > Latches work in single user mode, it's just that the new code for some > > > reason uses uninitialized memory as the latch. As I pointed out above, > > > the new code really should just use MyLatch instead of > > > MyProc->procLatch. > > FWIW, I'd misremembered some code here, and we actually reach the > function initializing the shared latch, even in single user mode. > > > > We seem to have accumulated quite a few instance of that. > > > > [rhaas pgsql]$ git grep MyLatch | wc -l > > 116 > > [rhaas pgsql]$ git grep 'MyProc->procLatch' | wc -l > > 33 > > > > Most of the offenders are in src/backend/replication, but there are > > some that are related to parallelism as well (bgworker.c, pqmq.c, > > parallel.c, condition_variable.c). Maybe we (you?) should just go and > > change them all. I don't think using MyLatch instead of > > MyProc->procLatch has become automatic for everyone yet. > > Nevertheless this should be changed. Will do.
Here's the patch for that, also addressing some issues I found while updating those callsites (separate thread started, too). - Andres
>From 9206ced1dc05d3a9cc99faafa22d5d8b16d998d1 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Tue, 6 Jun 2017 16:13:00 -0700 Subject: [PATCH] Clean up latch related code. The larger part of this patch replaces usages of MyProc->procLatch with MyLatch. The latter works even early during backend startup, where MyProc->procLatch doesn't yet. While the affected code shouldn't run in cases where it's not initialized, it might get copied into places where it might. Using MyLatch is simpler and a bit faster to boot, so there's little point to stick with the previous coding. While doing so I noticed some weaknesses around newly introduced uses of latches that could lead to missed events, and an omitted CHECK_FOR_INTERRUPTS() call in worker_spi. As all the actual bugs are in v10 code, there doesn't seem to be sufficient reason to backpatch this. Author: Andres Freund Discussion: https://postgr.es/m/20170606195321.sjmenrfgl2nu6...@alap3.anarazel.de https://postgr.es/m/20170606210405.sim3yl6vpudhm...@alap3.anarazel.de Backpatch: - --- src/backend/access/transam/parallel.c | 4 +-- src/backend/libpq/pqmq.c | 4 +-- src/backend/postmaster/bgworker.c | 4 +-- .../libpqwalreceiver/libpqwalreceiver.c | 13 ++++---- src/backend/replication/logical/launcher.c | 35 +++++++++++++++------- src/backend/replication/logical/tablesync.c | 12 ++++---- src/backend/replication/logical/worker.c | 10 +++++-- src/backend/storage/lmgr/condition_variable.c | 6 ++-- src/test/modules/worker_spi/worker_spi.c | 2 ++ 9 files changed, 56 insertions(+), 34 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index cb22174270..16a0bee610 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) if (!anyone_alive) break; - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1, + WaitLatch(MyLatch, WL_LATCH_SET, -1, WAIT_EVENT_PARALLEL_FINISH); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } if (pcxt->toc != NULL) diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 96939327c3..8fbc03819d 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len) if (result != SHM_MQ_WOULD_BLOCK) break; - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0, + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_PUT_MESSAGE); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index c3454276bf..712d700481 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) if (status == BGWH_STOPPED) break; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) break; } - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return status; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 726d1b5bd8..89c34b8225 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ? WL_SOCKET_READABLE : WL_SOCKET_WRITEABLE); - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | WL_LATCH_SET | io_flag, PQsocket(conn->streamConn), @@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, /* Interrupted? */ if (rc & WL_LATCH_SET) { - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } @@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) * the signal arrives in the middle of establishment of * replication connection. */ - ResetLatch(&MyProc->procLatch); - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_LATCH_SET, PQsocket(streamConn), 0, WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout? */ if (rc & WL_POSTMASTER_DEATH) exit(1); - /* interrupted */ + /* Interrupted? */ if (rc & WL_LATCH_SET) { + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); - continue; } if (PQconsumeInput(streamConn) == 0) return NULL; /* trouble */ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 5aaf24bfe4..5a3274b2c2 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_STARTUP); + /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(MyLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } } return; @@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); - CHECK_FOR_INTERRUPTS(); - /* Wait for signal. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_STARTUP); @@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } /* Check worker status. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) CHECK_FOR_INTERRUPTS(); /* Wait for more work. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } } } @@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg) } /* Wait for more work. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); @@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); } - - ResetLatch(&MyProc->procLatch); } LogicalRepCtx->launcher_pid = 0; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ed66602935..6e55d2d606 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) if (!worker) return false; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); @@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return false; @@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state) if (MyLogicalRepWorker->relstate == expected_state) return true; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); @@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return false; @@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Wait for more data or latch. */ - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); @@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return bytesread; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 51a64487cd..999d627c87 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Wait for more data or latch. */ - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, fd, NAPTIME_PER_CYCLE, @@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (got_SIGHUP) { got_SIGHUP = false; @@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); } - - ResetLatch(&MyProc->procLatch); } } diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 5afb21121b..b4b7d28dd5 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) { cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1); AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET, - &MyProc->procLatch, NULL); + MyLatch, NULL); } /* * Reset my latch before adding myself to the queue and before entering * the caller's predicate loop. */ - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); /* Add myself to the wait queue. */ SpinLockAcquire(&cv->mutex); @@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info); /* Reset latch before testing whether we can return. */ - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); /* * If this process has been taken out of the wait list, then we know diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c index 9abfc714a9..553baf0045 100644 --- a/src/test/modules/worker_spi/worker_spi.c +++ b/src/test/modules/worker_spi/worker_spi.c @@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + CHECK_FOR_INTERRUPTS(); + /* * In case of a SIGHUP, just reload the configuration. */ -- 2.12.0.264.gd6db3f2165.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers