Hi, Tomas. Thank you so much for your review! Please find the revised patchset.
On Thu, Mar 13, 2025 at 4:15 PM Tomas Vondra <to...@vondra.me> wrote: > I did a quick look at this patch. I haven't found any correctness > issues, but I have some general review comments and questions about the > grammar / syntax. > > 1) The sgml docs don't really show the syntax very nicely, it only shows > this at the beginning of wait_for.sgml: > > WAIT FOR ( <replaceable class="parameter">parameter</replaceable> > '<replaceable class="parameter">value</replaceable>' [, ... ] ) ] > > I kinda understand this comes from using the generic option list (I'll > get to that shortly), but I think it'd be much better to actually show > the "full" syntax here, instead of leaving the "parameters" to later. Sounds reasonable, changed to show the full syntax in the synopsis. > 2) The syntax description suggests "(" and ")" are required, but that > does not seem to be the case - in fact, it's not even optional, and when > I try using that, I get syntax error. Good catch, fixed. > 3) I have my doubts about using the generic_option_list for this. Yes, I > understand this allows using fewer reserved keywords, but it leads to > some weirdness and I'm not sure it's worth it. Not sure what the right > trade off is here. > > Anyway, some examples of the weird stuff implied by this approach: > > - it forces "," between the options, which is a clear difference from > what we do for every other command > > - it forces everything to be a string, i.e. you can' say "TIMEOUT 10", > it has to be "TIMEOUT '10'" > > I don't have a very strong opinion on this, but the result seems a bit > strange to me. I've improved the syntax. I still tried to keep the number of new keywords and grammar rules minimal. That leads to moving some parser login into wait.c. This is probably a bit awkward, but saves our grammar from bloat. Let me know what do you think about this approach. > 4) I'm not sure I understand the motivation of the "throw false" mode, > and I'm not sure I understand this description in the sgml docs: > > On timeout, or if the server is promoted before > <parameter>lsn</parameter> is reached, an error is emitted, > as soon as <parameter>throw</parameter> is not specified or set to > true. > If <parameter>throw</parameter> is set to false, then the command > doesn't throw errors. > > I find it a bit confusing. What is the use case for this mode? The idea here is that application could do some handling of these errors without having to parse the error messages (parsing error messages is inconvenient because of localization etc). > 5) One place in the docs says: > > The target log sequence number to wait for. > > Thie is literally the only place using "log sequence number" in our > code base, I'd just use "LSN" just like every other place. OK fixed. > 6) The docs for the TIMEOUT parameter say this: > > <varlistentry> > <term><replaceable class="parameter">timeout</replaceable></term> > <listitem> > <para> > When specified and greater than zero, the command waits until > <parameter>lsn</parameter> is reached or the specified > <parameter>timeout</parameter> has elapsed. Must be a non- > negative integer, the default is zero. > </para> > </listitem> > </varlistentry> > > That doesn't say what unit does the option use. Is is seconds, > milliseconds or what? > > In fact, it'd be nice to let users specify that in the value, similar > to other options (e.g. SET statement_timeout = '10s'). The default unit of milliseconds is specified. Also, an alternative way to specify timeout is now supported. Timeout might be a string literal consisting of numeric and unit specifier. > 7) One place in the docs says this: > > That is, after this function execution, the value returned by > <function>pg_last_wal_replay_lsn</function> should be greater ... > > I think the reference to "function execution" is obsolete? Actually, this is just the function, which reports current replay LSN, not function introduced by previous version of this patch. We refer it to just express the constraint that LSN must be replayed after execution of the command. > 8) I find this confusing: > > However, if <command>WAIT FOR</command> is > called on primary promoted from standby and <literal>lsn</literal> > was already replayed, then the <command>WAIT FOR</command> command > just exits immediately. > > Does this mean running the WAIT command on a primary (after it was > already promoted) will exit immediately? Why does it matter that it > was promoted from a standby? Shouldn't it exit immediately even for > a standalone instance? I think the previous sentence should give an idea that otherwise error gets thrown. That also happens immediately for sure. > 9) xlogwait.c > > I think this should start with a basic "design" description of how the > wait is implemented, in a comment at the top of the file. That is, what > we keep in the shared memory, what happens during a wait, how it uses > the pairing heap, etc. After reading this comment I should understand > how it all fits together. OK, I've added the header comment. > 10) WaitForLSNReplay / WaitLSNWakeup > > I think the function comment should document the important stuff (e.g. > return values for various situations, how it groups waiters into chunks > of 16 elements during wakeup, ...). Revised header comments for those functions too. > 11) WaitLSNProcInfo / WaitLSNState > > Does this need to be exposed in xlogwait.h? These structs seem private > to xlogwait.c, so maybe declare it there? Hmm, I don't remember why I moved them to xlogwait.h. OK, moved them back to xlogwait.c. ------ Regards, Alexander Korotkov Supabase
From 11f1b1db81ff323354035dba34a34f5ac55177a3 Mon Sep 17 00:00:00 2001 From: Alexander Korotkov <akorotkov@postgresql.org> Date: Mon, 10 Mar 2025 12:59:38 +0200 Subject: [PATCH v6] Implement WAIT FOR command WAIT FOR is to be used on standby and specifies waiting for the specific WAL location to be replayed. This option is useful when the user makes some data changes on primary and needs a guarantee to see these changes are on standby. The queue of waiters is stored in the shared memory as an LSN-ordered pairing heap, where the waiter with the nearest LSN stays on the top. During the replay of WAL, waiters whose LSNs have already been replayed are deleted from the shared memory pairing heap and woken up by setting their latches. WAIT FOR needs to wait without any snapshot held. Otherwise, the snapshot could prevent the replay of WAL records, implying a kind of self-deadlock. This is why separate utility command seems appears to be the most robust way to implement this functionality. It's not possible to implement this as a function. Previous experience shows that stored procedures also have limitation in this aspect. Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru Author: Kartyshov Ivan <i.kartyshov@postgrespro.ru> Author: Alexander Korotkov <aekorotkov@gmail.com> Reviewed-by: Michael Paquier <michael@paquier.xyz> Reviewed-by: Peter Eisentraut <peter.eisentraut@enterprisedb.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Alexander Lakhin <exclusion@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Euler Taveira <euler@eulerto.com> Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> --- doc/src/sgml/high-availability.sgml | 54 +++ doc/src/sgml/ref/allfiles.sgml | 1 + doc/src/sgml/ref/wait_for.sgml | 226 +++++++++ doc/src/sgml/reference.sgml | 1 + src/backend/access/transam/Makefile | 3 +- src/backend/access/transam/meson.build | 1 + src/backend/access/transam/xact.c | 6 + src/backend/access/transam/xlog.c | 7 + src/backend/access/transam/xlogrecovery.c | 11 + src/backend/access/transam/xlogwait.c | 435 ++++++++++++++++++ src/backend/commands/Makefile | 3 +- src/backend/commands/meson.build | 1 + src/backend/commands/wait.c | 235 ++++++++++ src/backend/lib/pairingheap.c | 18 +- src/backend/parser/gram.y | 29 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/lmgr/proc.c | 6 + src/backend/tcop/pquery.c | 12 +- src/backend/tcop/utility.c | 22 + .../utils/activity/wait_event_names.txt | 2 + src/include/access/xlogwait.h | 41 ++ src/include/commands/wait.h | 21 + src/include/lib/pairingheap.h | 3 + src/include/nodes/parsenodes.h | 7 + src/include/parser/kwlist.h | 1 + src/include/storage/lwlocklist.h | 1 + src/include/tcop/cmdtaglist.h | 1 + src/test/recovery/meson.build | 1 + src/test/recovery/t/046_wait_for_lsn.pl | 217 +++++++++ src/tools/pgindent/typedefs.list | 5 + 30 files changed, 1363 insertions(+), 11 deletions(-) create mode 100644 doc/src/sgml/ref/wait_for.sgml create mode 100644 src/backend/access/transam/xlogwait.c create mode 100644 src/backend/commands/wait.c create mode 100644 src/include/access/xlogwait.h create mode 100644 src/include/commands/wait.h create mode 100644 src/test/recovery/t/046_wait_for_lsn.pl diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index b47d8b4106e..e29141c0538 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1376,6 +1376,60 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)' </sect3> </sect2> + <sect2 id="read-your-writes-consistency"> + <title>Read-Your-Writes Consistency</title> + + <para> + In asynchronous replication, there is always a short window where changes + on the primary may not yet be visible on the standby due to replication + lag. This can lead to inconsistencies when an application writes data on + the primary and then immediately issues a read query on the standby. + However, it possible to address this without switching to the synchronous + replication + </para> + + <para> + To address this, PostgreSQL offers a mechanism for read-your-writes + consistency. The key idea is to ensure that a client sees its own writes + by synchronizing the WAL replay on the standby with the known point of + change on the primary. + </para> + + <para> + This is achieved by the following steps. After performing write + operations, the application retrieves the current WAL location using a + function call like this. + + <programlisting> +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) + </programlisting> + </para> + + <para> + The <acronym>LSN</acronym> obtained from the primary is then communicated + to the standby server. This can be managed at the application level or + via the connection pooler. On the standby, the application issues the + <xref linkend="sql-wait-for"/> command to block further processing until + the standby's WAL replay process reaches (or exceeds) the specified + <acronym>LSN</acronym>. + + <programlisting> +postgres=# WAIT FOR LSN '0/306EE20'; + RESULT STATUS +--------------- + success +(1 row) + </programlisting> + Once the command returns a status of success, it guarantees that all + changes up to the provided <acronym>LSN</acronym> have been applied, + ensuring that subsequent read queries will reflect the latest updates. + </para> + </sect2> + <sect2 id="continuous-archiving-in-standby"> <title>Continuous Archiving in Standby</title> diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index f5be638867a..e167406c744 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory. <!ENTITY update SYSTEM "update.sgml"> <!ENTITY vacuum SYSTEM "vacuum.sgml"> <!ENTITY values SYSTEM "values.sgml"> +<!ENTITY waitFor SYSTEM "wait_for.sgml"> <!-- applications and utilities --> <!ENTITY clusterdb SYSTEM "clusterdb.sgml"> diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml new file mode 100644 index 00000000000..ff3f309bc7c --- /dev/null +++ b/doc/src/sgml/ref/wait_for.sgml @@ -0,0 +1,226 @@ +<!-- +doc/src/sgml/ref/wait_for.sgml +PostgreSQL documentation +--> + +<refentry id="sql-wait-for"> + <indexterm zone="sql-wait-for"> + <primary>AFTER</primary> + </indexterm> + + <refmeta> + <refentrytitle>WAIT FOR</refentrytitle> + <manvolnum>7</manvolnum> + <refmiscinfo>SQL - Language Statements</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>WAIT FOR</refname> + <refpurpose>wait target <acronym>LSN</acronym> to be replayed within the specified timeout</refpurpose> + </refnamediv> + + <refsynopsisdiv> +<synopsis> +WAIT FOR ( <replaceable class="parameter">option</replaceable> [, ... ] ) ] +ALTER ROLE <replaceable class="parameter">role_specification</replaceable> [ WITH ] <replaceable class="parameter">option</replaceable> [ ... ] + +<phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase> + + LSN '<replaceable class="parameter">lsn</replaceable>' + | TIMEOUT <replaceable class="parameter">timeout</replaceable> + | NO_THROW +</synopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + + <para> + Waits until recovery replays <parameter>lsn</parameter>. + If no <parameter>timeout</parameter> is specified or it is set to + zero, this command waits indefinitely for the + <parameter>lsn</parameter>. + On timeout, or if the server is promoted before + <parameter>lsn</parameter> is reached, an error is emitted, + as soon as <literal>NO_THROW</literal> is not specified. + If <parameter>NO_THROW</parameter> is specified, then the command + doesn't throw errors. + </para> + + <para> + The possible return values are <literal>success</literal>, + <literal>timeout</literal>, and <literal>not in recovery</literal>. + </para> + </refsect1> + + <refsect1> + <title>Options</title> + + <variablelist> + <varlistentry> + <term><literal>LSN</literal> '<replaceable class="parameter">lsn</replaceable>'</term> + <listitem> + <para> + Specifies the target <acronym>LSN</acronym> to wait for. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>TIMEOUT</literal> <replaceable class="parameter">timeout</replaceable></term> + <listitem> + <para> + When specified and <parameter>timeout</parameter> is greater than zero, + the command waits until <parameter>lsn</parameter> is reached or + the specified <parameter>timeout</parameter> has elapsed. + </para> + <para> + The <parameter>timeout</parameter> might be given as integer number of + milliseconds. Also it might be given as string literal with + integer number of milliseconds or a number with unit + (see <xref linkend="config-setting-names-values"/>). + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>NO_THROW</literal></term> + <listitem> + <para> + Specify to not throw an error in the case of timeout or + running on the primary. In this case the result status can be get from + the return value. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <refsect1> + <title>Return values</title> + + <variablelist> + <varlistentry> + <term><replaceable class="parameter">success</replaceable></term> + <listitem> + <para> + This return value denotes that we have successfully reached + the target <parameter>lsn</parameter>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="parameter">timeout</replaceable></term> + <listitem> + <para> + This return value denotes that the timeout happened before reaching + the target <parameter>lsn</parameter>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="parameter">not in recovery</replaceable></term> + <listitem> + <para> + This return value denotes that the database server is not in a recovery + state. This might mean either the database server was not in recovery + at the moment of receiving the command, or it was promoted before + reaching the target <parameter>lsn</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <refsect1> + <title>Notes</title> + + <para> + <command>WAIT FOR</command> command waits till + <parameter>lsn</parameter> to be replayed on standby. + That is, after this function execution, the value returned by + <function>pg_last_wal_replay_lsn</function> should be greater or equal + to the <parameter>lsn</parameter> value. This is useful to achieve + read-your-writes-consistency, while using async replica for reads and + primary for writes. In that case, the <acronym>lsn</acronym> of the last + modification should be stored on the client application side or the + connection pooler side. + </para> + + <para> + <command>WAIT FOR</command> command should be called on standby. + If a user runs <command>WAIT FOR</command> on primary, it + will error out as soon as <parameter>NO_THROW</parameter> is not specified. + However, if <function>pg_wal_replay_wait</function> is + called on primary promoted from standby and <literal>lsn</literal> + was already replayed, then the <command>WAIT FOR</command> command just + exits immediately. + </para> + +</refsect1> + + <refsect1> + <title>Examples</title> + + <para> + You can use <command>WAIT FOR</command> command to wait for + the <type>pg_lsn</type> value. For example, an application could update + the <literal>movie</literal> table and get the <acronym>lsn</acronym> after + changes just made. This example uses <function>pg_current_wal_insert_lsn</function> + on primary server to get the <acronym>lsn</acronym> given that + <varname>synchronous_commit</varname> could be set to + <literal>off</literal>. + + <programlisting> +postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; +UPDATE 100 +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) + </programlisting> + + Then an application could run <command>WAIT FOR</command> + with the <parameter>lsn</parameter> obtained from primary. After that the + changes made on primary should be guaranteed to be visible on replica. + + <programlisting> +postgres=# WAIT FOR LSN '0/306EE20'; + RESULT STATUS +--------------- + success +(1 row) +postgres=# SELECT * FROM movie WHERE genre = 'Drama'; + genre +------- +(0 rows) + </programlisting> + </para> + + <para> + It may also happen that target <parameter>lsn</parameter> is not reached + within the timeout. In that case the error is thrown. + + <programlisting> +postgres=# WAIT FOR LSN '0/306EE20' TIMEOUT '0.1 s'; +ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60 + </programlisting> + </para> + + <para> + The same example uses <command>WAIT FOR</command> with + <parameter>NO_THROW</parameter> option. + + <programlisting> +postgres=# WAIT FOR LSN '0/306EE20' TIMEOUT 100 NO_THROW; + RESULT STATUS +--------------- + timeout +(1 row) + </programlisting> + </para> + </refsect1> +</refentry> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index ff85ace83fc..2cf02c37b17 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -216,6 +216,7 @@ &update; &vacuum; &values; + &waitFor; </reference> diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 661c55a9db7..a32f473e0a2 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -36,7 +36,8 @@ OBJS = \ xlogreader.o \ xlogrecovery.o \ xlogstats.o \ - xlogutils.o + xlogutils.o \ + xlogwait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index e8ae9b13c8e..74a62ab3eab 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -24,6 +24,7 @@ backend_sources += files( 'xlogrecovery.c', 'xlogstats.c', 'xlogutils.c', + 'xlogwait.c', ) # used by frontend programs to build a frontend xlogreader diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b885513f765..511e5531fb8 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -2831,6 +2832,11 @@ AbortTransaction(void) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Clear wait information and command progress indicator */ pgstat_report_wait_end(); pgstat_progress_end_command(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2d4c346473b..a0c98d9e801 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,6 +62,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -6361,6 +6362,12 @@ StartupXLOG(void) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * Wake up all waiters for replay LSN. They need to report an error that + * recovery was ended before reaching the target LSN. + */ + WaitLSNWakeup(InvalidXLogRecPtr); + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 6ce979f2d8b..2097271b2f8 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,6 +40,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" @@ -1836,6 +1837,16 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (XLogRecoveryCtl->lastReplayedEndRecPtr >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) + WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c new file mode 100644 index 00000000000..c2aee2d41f0 --- /dev/null +++ b/src/backend/access/transam/xlogwait.c @@ -0,0 +1,435 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.c + * Implements waiting for the given replay LSN, which is used in + * WAIT FOR lsn '...' + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogwait.c + * + * NOTES + * This file implements waiting for the replay of the given LSN on a + * physical standby. The core idea is very small: every backend that + * wants to wait publishes the LSN it needs to the shared memory, and + * the startup process wakes it once that LSN has been replayed. + * + * The shared memory used by this module comprises a procInfos + * per-backend array with the information of the awaited LSN for each + * of the backend processes. The elements of that array are organized + * into a pairing heap waitersHeap, which allows for very fast finding + * of the least awaited LSN. + * + * In addition, the least-awaited LSN is cached as minWaitedLSN. The + * waiter process publishes information about itself to the shared + * memory and waits on the latch before it wakens up by a startup + * process, timeout is reached, standby is promoted, or the postmaster + * dies. Then, it cleans information about itself in the shared memory. + * + * After replaying a WAL record, the startup process first performs a + * fast path check minWaitedLSN > replayLSN. If this check is negative, + * it checks waitersHeap and wakes up the backend whose awaited LSNs + * are reached. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <float.h> +#include <math.h> + +#include "access/xlog.h" +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +/* + * WaitLSNProcInfo - the shared memory structure representing information + * about the single process, which may wait for LSN replay. An item of + * waitLSN->procInfos array. + */ +typedef struct WaitLSNProcInfo +{ + /* LSN, which this process is waiting for */ + XLogRecPtr waitLSN; + + /* Process to wake up once the waitLSN is replayed */ + ProcNumber procno; + + /* + * A flag indicating that this item is present in + * waitLSNState->waitersHeap + */ + bool inHeap; + + /* A pairing heap node for participation in waitLSNState->waitersHeap */ + pairingheap_node phNode; +} WaitLSNProcInfo; + +/* + * WaitLSNState - the shared memory state for the replay LSN waiting facility. + */ +typedef struct WaitLSNState +{ + /* + * The minimum LSN value some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after replaying a + * WAL record. Could be read lock-less. Update protected by WaitLSNLock. + */ + pg_atomic_uint64 minWaitedLSN; + + /* + * A pairing heap of waiting processes order by LSN values (least LSN is + * on top). Protected by WaitLSNLock. + */ + pairingheap waitersHeap; + + /* + * An array with per-process information, indexed by the process number. + * Protected by WaitLSNLock. + */ + WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + + +static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); + +struct WaitLSNState *waitLSNState = NULL; + +/* Report the amount of shared memory space needed for WaitLSNState. */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, procInfos); + size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); + return size; +} + +/* Initialize the WaitLSNState in the shared memory. */ +void +WaitLSNShmemInit(void) +{ + bool found; + + waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState", + WaitLSNShmemSize(), + &found); + if (!found) + { + pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL); + memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); + } +} + +/* + * Comparison function for waitLSN->waitersHeap heap. Waiting processes are + * ordered by lsn, so that the waiter with smallest lsn is at the top. + */ +static int +waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Update waitLSN->minWaitedLSN according to the current state of + * waitLSN->waitersHeap. + */ +static void +updateMinWaitedLSN(void) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; + + if (!pairingheap_is_empty(&waitLSNState->waitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); + + minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; + } + + pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN); +} + +/* + * Put the current process into the heap of LSN waiters. + */ +static void +addLSNWaiter(XLogRecPtr lsn) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + Assert(!procInfo->inHeap); + + procInfo->procno = MyProcNumber; + procInfo->waitLSN = lsn; + + pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode); + procInfo->inHeap = true; + updateMinWaitedLSN(); + + LWLockRelease(WaitLSNLock); +} + +/* + * Remove the current process from the heap of LSN waiters if it's there. + */ +static void +deleteLSNWaiter(void) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + if (!procInfo->inHeap) + { + LWLockRelease(WaitLSNLock); + return; + } + + pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode); + procInfo->inHeap = false; + updateMinWaitedLSN(); + + LWLockRelease(WaitLSNLock); +} + +/* + * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated + * on the stack. It should be enough to take single iteration for most cases. + */ +#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16) + +/* + * Remove waiters whose LSN has been replayed from the heap and set their + * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap + * and set latches for all waiters. + * + * This function first accumulates waiters to wake up into an array, then + * wakes them up without holding a WaitLSNLock. The array size is static and + * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough + * to wake up all the waiters at once in the vast majority of cases. However, + * if there are more waiters, this function will loop to process them in + * multiple chunks. + */ +void +WaitLSNWakeup(XLogRecPtr currentLSN) +{ + int i; + ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE]; + int numWakeUpProcs; + + do + { + numWakeUpProcs = 0; + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + /* + * Iterate the pairing heap of waiting processes till we find LSN not + * yet replayed. Record the process numbers to wake up, but to avoid + * holding the lock for too long, send the wakeups only after + * releasing the lock. + */ + while (!pairingheap_is_empty(&waitLSNState->waitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); + + if (!XLogRecPtrIsInvalid(currentLSN) && + procInfo->waitLSN > currentLSN) + break; + + Assert(numWakeUpProcs < MaxBackends); + wakeUpProcs[numWakeUpProcs++] = procInfo->procno; + (void) pairingheap_remove_first(&waitLSNState->waitersHeap); + procInfo->inHeap = false; + + if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE) + break; + } + + updateMinWaitedLSN(); + + LWLockRelease(WaitLSNLock); + + /* + * Set latches for processes, whose waited LSNs are already replayed. + * As the time consuming operations, we do it this outside of + * WaitLSNLock. This is actually fine because procLatch isn't ever + * freed, so we just can potentially set the wrong process' (or no + * process') latch. + */ + for (i = 0; i < numWakeUpProcs; i++) + SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch); + + /* Need to recheck if there were more waiters than static array size. */ + } + while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE); +} + +/* + * Delete our item from shmem array if any. + */ +void +WaitLSNCleanup(void) +{ + /* + * We do a fast-path check of the 'inHeap' flag without the lock. This + * flag is set to true only by the process itself. So, it's only possible + * to get a false positive. But that will be eliminated by a recheck + * inside deleteLSNWaiter(). + */ + if (waitLSNState->procInfos[MyProcNumber].inHeap) + deleteLSNWaiter(); +} + +/* + * Wait using MyLatch till the given LSN is replayed, a timeout happens, the + * replica gets promoted, or the postmaster dies. + * + * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was replayed. Returns + * WAIT_LSN_RESULT_TIMEOUT if the timeout was reached before the target LSN + * replayed. Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery, + * or replica got promoted before the target LSN replayed. + */ +WaitLSNResult +WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) +{ + XLogRecPtr currentLSN; + TimestampTz endtime = 0; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends); + + if (!RecoveryInProgress()) + { + /* + * Recovery is not in progress. Given that we detected this in the + * very first check, this procedure was mistakenly called on primary. + * However, it's possible that standby was promoted concurrently to + * the procedure call, while target LSN is replayed. So, we still + * check the last replay LSN before reporting an error. + */ + if (PromoteIsTriggered() && targetLSN <= GetXLogReplayRecPtr(NULL)) + return WAIT_LSN_RESULT_SUCCESS; + return WAIT_LSN_RESULT_NOT_IN_RECOVERY; + } + else + { + /* If target LSN is already replayed, exit immediately */ + if (targetLSN <= GetXLogReplayRecPtr(NULL)) + return WAIT_LSN_RESULT_SUCCESS; + } + + if (timeout > 0) + { + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); + wake_events |= WL_TIMEOUT; + } + + /* + * Add our process to the pairing heap of waiters. It might happen that + * target LSN gets replayed before we do. Another check at the beginning + * of the loop below prevents the race condition. + */ + addLSNWaiter(targetLSN); + + for (;;) + { + int rc; + long delay_ms = 0; + + /* Recheck that recovery is still in-progress */ + if (!RecoveryInProgress()) + { + /* + * Recovery was ended, but recheck if target LSN was already + * replayed. See the comment regarding deleteLSNWaiter() below. + */ + deleteLSNWaiter(); + currentLSN = GetXLogReplayRecPtr(NULL); + if (PromoteIsTriggered() && targetLSN <= currentLSN) + return WAIT_LSN_RESULT_SUCCESS; + return WAIT_LSN_RESULT_NOT_IN_RECOVERY; + } + else + { + /* Check if the waited LSN has been replayed */ + currentLSN = GetXLogReplayRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + } + + /* + * If the timeout value is specified, calculate the number of + * milliseconds before the timeout. Exit if the timeout is already + * reached. + */ + if (timeout > 0) + { + delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime); + if (delay_ms <= 0) + break; + } + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, delay_ms, + WAIT_EVENT_WAIT_FOR_WAL_REPLAY); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN replay"))); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory pairing heap. We might + * already be deleted by the startup process. The 'inHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(); + + /* + * If we didn't reach the target LSN, we must be exited by timeout. + */ + if (targetLSN > currentLSN) + return WAIT_LSN_RESULT_TIMEOUT; + + return WAIT_LSN_RESULT_SUCCESS; +} diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cb2fbdc7c60..f99acfd2b4b 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -64,6 +64,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index dd4cde41d32..9f640ad4810 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -53,4 +53,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'wait.c', ) diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 00000000000..784c779a252 --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,235 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR, which allows waiting for events such as + * time passing or LSN having been replayed on replica. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "catalog/pg_collation_d.h" +#include "commands/wait.h" +#include "executor/executor.h" +#include "nodes/print.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/fmgrprotos.h" +#include "utils/formatting.h" +#include "utils/guc.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +typedef enum +{ + WaitStmtParamNone, + WaitStmtParamTimeout, + WaitStmtParamLSN +} WaitStmtParam; + +void +ExecWaitStmt(WaitStmt *stmt, DestReceiver *dest) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + int64 timeout = 0; + WaitLSNResult waitLSNResult; + bool throw = true; + TupleDesc tupdesc; + TupOutputState *tstate; + const char *result = "<unset>"; + WaitStmtParam curParam = WaitStmtParamNone; + + /* + * Process the list of parameters. + */ + foreach_ptr(Node, option, stmt->options) + { + if (IsA(option, String)) + { + String *str = castNode(String, option); + char *name = str_tolower(str->sval, strlen(str->sval), + DEFAULT_COLLATION_OID); + + if (curParam != WaitStmtParamNone) + elog(ERROR, "Unexpected param"); + + if (strcmp(name, "lsn") == 0) + curParam = WaitStmtParamLSN; + else if (strcmp(name, "timeout") == 0) + curParam = WaitStmtParamTimeout; + else if (strcmp(name, "no_throw") == 0) + throw = false; + else + elog(ERROR, "Unexpected param"); + + } + else if (IsA(option, Integer)) + { + Integer *intVal = castNode(Integer, option); + + if (curParam != WaitStmtParamTimeout) + elog(ERROR, "Unexpected integer"); + + timeout = intVal->ival; + + curParam = WaitStmtParamNone; + } + else if (IsA(option, A_Const)) + { + A_Const *constVal = castNode(A_Const, option); + String *str = &constVal->val.sval; + + if (curParam != WaitStmtParamLSN && + curParam != WaitStmtParamTimeout) + elog(ERROR, "Unexpected string"); + + if (curParam == WaitStmtParamLSN) + { + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(str->sval))); + } + else if (curParam == WaitStmtParamTimeout) + { + const char *hintmsg; + double result; + + if (!parse_real(str->sval, &result, GUC_UNIT_MS, &hintmsg)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for timeout option: \"%s\"", + str->sval), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); + } + timeout = (int64) result; + } + + curParam = WaitStmtParamNone; + } + + } + + if (XLogRecPtrIsInvalid(lsn)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_PARAMETER), + errmsg("\"lsn\" must be specified"))); + + if (timeout < 0) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("\"timeout\" must not be negative"))); + + /* + * We are going to wait for the LSN replay. We should first care that we + * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * Otherwise, our snapshot could prevent the replay of WAL records + * implying a kind of self-deadlock. This is the reason why + * pg_wal_replay_wait() is a procedure, not a function. + * + * At first, we should check there is no active snapshot. According to + * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is + * processed with a snapshot. Thankfully, we can pop this snapshot, + * because PortalRunUtility() can tolerate this. + */ + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + + /* + * At second, invalidate a catalog snapshot if any. And we should be done + * with the preparation. + */ + InvalidateCatalogSnapshot(); + + /* Give up if there is still an active or registered snapshot. */ + if (HaveRegisteredOrActiveSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("WAIT FOR must be only called without an active or registered snapshot"), + errdetail("Make sure WAIT FOR isn't called within a transaction with an isolation level higher than READ COMMITTED, procedure, or a function."))); + + /* + * As the result we should hold no snapshot, and correspondingly our xmin + * should be unset. + */ + Assert(MyProc->xmin == InvalidTransactionId); + + waitLSNResult = WaitForLSNReplay(lsn, timeout); + + /* + * Process the result of WaitForLSNReplay(). Throw appropriate error if + * needed. + */ + switch (waitLSNResult) + { + case WAIT_LSN_RESULT_SUCCESS: + /* Nothing to do on success */ + result = "success"; + break; + + case WAIT_LSN_RESULT_TIMEOUT: + if (throw) + ereport(ERROR, + (errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))))); + else + result = "timeout"; + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + if (throw) + { + if (PromoteIsTriggered()) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the replay LSN can only be executed during recovery."))); + } + } + else + result = "not in recovery"; + break; + } + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = WaitStmtResultDesc(stmt); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* Send it */ + do_text_output_oneline(tstate, result); + + end_tup_output(tstate); +} + +TupleDesc +WaitStmtResultDesc(WaitStmt *stmt) +{ + TupleDesc tupdesc; + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "RESULT STATUS", + TEXTOID, -1, 0); + return tupdesc; +} diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c index 0aef8a88f1b..fa8431f7946 100644 --- a/src/backend/lib/pairingheap.c +++ b/src/backend/lib/pairingheap.c @@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg) pairingheap *heap; heap = (pairingheap *) palloc(sizeof(pairingheap)); + pairingheap_initialize(heap, compare, arg); + + return heap; +} + +/* + * pairingheap_initialize + * + * Same as pairingheap_allocate(), but initializes the pairing heap in-place + * rather than allocating a new chunk of memory. Useful to store the pairing + * heap in a shared memory. + */ +void +pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, + void *arg) +{ heap->ph_compare = compare; heap->ph_arg = arg; heap->ph_root = NULL; - - return heap; } /* diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3c4268b271a..5ff7157a12a 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -303,7 +303,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt UnlistenStmt UpdateStmt VacuumStmt VariableResetStmt VariableSetStmt VariableShowStmt - ViewStmt CheckPointStmt CreateConversionStmt + ViewStmt WaitStmt CheckPointStmt CreateConversionStmt DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt @@ -672,6 +672,9 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); json_object_constructor_null_clause_opt json_array_constructor_null_clause_opt +%type <node> wait_option +%type <list> wait_option_list + /* * Non-keyword token types. These are hard-wired into the "flex" lexer. @@ -786,7 +789,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -1114,6 +1117,7 @@ stmt: | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -16364,6 +16368,25 @@ xml_passing_mech: | BY VALUE_P ; +WaitStmt: + WAIT FOR wait_option_list + { + WaitStmt *n = makeNode(WaitStmt); + n->options = $3; + $$ = (Node *)n; + } + ; + +wait_option_list: + wait_option { $$ = list_make1($1); } + | wait_option_list wait_option { $$ = lappend($1, $2); } + ; + +wait_option: ColLabel { $$ = (Node *) makeString($1); } + | NumericOnly { $$ = (Node *) $1; } + | Sconst { $$ = (Node *) makeStringConst($1, @1); } + + ; /* * Aggregate decoration clauses @@ -18023,6 +18046,7 @@ unreserved_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHITESPACE_P | WITHIN | WITHOUT @@ -18680,6 +18704,7 @@ bare_label_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHEN | WHITESPACE_P | WORK diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 00c76d05356..87411aece47 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -24,6 +24,7 @@ #include "access/twophase.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); size = add_size(size, MemoryContextReportingShmemSize()); + size = add_size(size, WaitLSNShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -346,6 +348,7 @@ CreateOrAttachShmemStructs(void) InjectionPointShmemInit(); AioShmemInit(); MemoryContextReportingShmemInit(); + WaitLSNShmemInit(); } /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index f194e6b3dcc..c966acdbff0 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -948,6 +949,11 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Cancel any pending condition variable sleep, too */ ConditionVariableCancelSleep(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8164d0fbb4f..f4d37c0bfc2 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1195,10 +1195,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, MemoryContextSwitchTo(portal->portalContext); /* - * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from - * under us, so don't complain if it's now empty. Otherwise, our snapshot - * should be the top one; pop it. Note that this could be a different - * snapshot from the one we made above; see EnsurePortalSnapshotExists. + * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot + * stack from under us, so don't complain if it's now empty. Otherwise, + * our snapshot should be the top one; pop it. Note that this could be a + * different snapshot from the one we made above; see + * EnsurePortalSnapshotExists. */ if (portal->portalSnapshot != NULL && ActiveSnapshotSet()) { @@ -1793,7 +1794,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt) IsA(utilityStmt, ListenStmt) || IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || - IsA(utilityStmt, CheckPointStmt)) + IsA(utilityStmt, CheckPointStmt) || + IsA(utilityStmt, WaitStmt)) return false; return true; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 25fe3d58016..d23ac3b0f0b 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -56,6 +56,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/wait.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree) case T_PrepareStmt: case T_UnlistenStmt: case T_VariableSetStmt: + case T_WaitStmt: { /* * These modify only backend-local state, so they're OK to run @@ -1065,6 +1067,12 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; } + case T_WaitStmt: + { + ExecWaitStmt((WaitStmt *) parsetree, dest); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, pstmt, queryString, @@ -2067,6 +2075,9 @@ UtilityReturnsTuples(Node *parsetree) case T_VariableShowStmt: return true; + case T_WaitStmt: + return true; + default: return false; } @@ -2122,6 +2133,9 @@ UtilityTupleDescriptor(Node *parsetree) return GetPGVariableResultDesc(n->name); } + case T_WaitStmt: + return WaitStmtResultDesc((WaitStmt *) parsetree); + default: return NULL; } @@ -3099,6 +3113,10 @@ CreateCommandTag(Node *parsetree) } break; + case T_WaitStmt: + tag = CMDTAG_WAIT; + break; + /* already-planned queries */ case T_PlannedStmt: { @@ -3697,6 +3715,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_DDL; break; + case T_WaitStmt: + lev = LOGSTMT_ALL; + break; + /* already-planned queries */ case T_PlannedStmt: { diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 930321905f1..164a16bc5d8 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,6 +89,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_REPLAY "Waiting for a replay of the particular WAL position on the physical standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." @@ -353,6 +354,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +WaitLSN "Waiting to read or update shared Wait-for-LSN state." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h new file mode 100644 index 00000000000..15bddd9dba3 --- /dev/null +++ b/src/include/access/xlogwait.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.h + * Declarations for LSN replay waiting routines. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * src/include/access/xlogwait.h + * + *------------------------------------------------------------------------- + */ +#ifndef XLOG_WAIT_H +#define XLOG_WAIT_H + +#include "lib/pairingheap.h" +#include "port/atomics.h" +#include "postgres.h" +#include "storage/procnumber.h" +#include "storage/spin.h" +#include "tcop/dest.h" + +/* + * Result statuses for WaitForLSNReplay(). + */ +typedef enum +{ + WAIT_LSN_RESULT_SUCCESS, /* Target LSN is reached */ + WAIT_LSN_RESULT_TIMEOUT, /* Timeout occurred */ + WAIT_LSN_RESULT_NOT_IN_RECOVERY, /* Recovery ended before or during our + * wait */ +} WaitLSNResult; + +extern PGDLLIMPORT WaitLSNState *waitLSNState; + +extern Size WaitLSNShmemSize(void); +extern void WaitLSNShmemInit(void); +extern void WaitLSNWakeup(XLogRecPtr currentLSN); +extern void WaitLSNCleanup(void); +extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); + +#endif /* XLOG_WAIT_H */ diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h new file mode 100644 index 00000000000..b44c37aa4db --- /dev/null +++ b/src/include/commands/wait.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * wait.h + * prototypes for commands/wait.c + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * src/include/commands/wait.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_H +#define WAIT_H + +#include "nodes/parsenodes.h" +#include "tcop/dest.h" + +extern void ExecWaitStmt(WaitStmt *stmt, DestReceiver *dest); +extern TupleDesc WaitStmtResultDesc(WaitStmt *stmt); + +#endif /* WAIT_H */ diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h index 3c57d3fda1b..567586f2ecf 100644 --- a/src/include/lib/pairingheap.h +++ b/src/include/lib/pairingheap.h @@ -77,6 +77,9 @@ typedef struct pairingheap extern pairingheap *pairingheap_allocate(pairingheap_comparator compare, void *arg); +extern void pairingheap_initialize(pairingheap *heap, + pairingheap_comparator compare, + void *arg); extern void pairingheap_free(pairingheap *heap); extern void pairingheap_add(pairingheap *heap, pairingheap_node *node); extern pairingheap_node *pairingheap_first(pairingheap *heap); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4610fc61293..d06104d40ac 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4326,4 +4326,11 @@ typedef struct DropSubscriptionStmt DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; +typedef struct WaitStmt +{ + NodeTag type; + List *options; +} WaitStmt; + + #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index a4af3f717a1..dec7ec6e5ec 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -494,6 +494,7 @@ PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("virtual", VIRTUAL, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..eb9de7dae00 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, WaitLSN) diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index d250a714d59..c4606d65043 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false) PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false) PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true) PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false) +PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false) diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index cb983766c67..31b1e9bffcf 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -54,6 +54,7 @@ tests += { 't/043_no_contrecord_switch.pl', 't/044_invalidate_inactive_slots.pl', 't/045_archive_restartpoint.pl', + 't/046_wait_for_lsn.pl', ], }, } diff --git a/src/test/recovery/t/046_wait_for_lsn.pl b/src/test/recovery/t/046_wait_for_lsn.pl new file mode 100644 index 00000000000..f9446cce3f9 --- /dev/null +++ b/src/test/recovery/t/046_wait_for_lsn.pl @@ -0,0 +1,217 @@ +# Checks waiting for the lsn replay on standby using +# WAIT FOR procedure. +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->start; + +# And some content and take a backup +$node_primary->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Create a streaming standby with a 1 second delay from the backup +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->append_conf( + 'postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + +# 1. Make sure that WAIT FOR works: add new content to +# primary and memorize primary's insert LSN, then wait for that LSN to be +# replayed on standby. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn1}' TIMEOUT '1d'; + SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn); +]); + +# Make sure the current LSN on standby is at least as big as the LSN we +# observed on primary's before. +ok((split("\n", $output))[-1] >= 0, + "standby reached the same LSN as primary after WAIT FOR"); + +# 2. Check that new data is visible after calling WAIT FOR +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}'; + SELECT count(*) FROM wait_test; +]); + +# Make sure the count(*) on standby reflects the recent changes on primary +ok((split("\n", $output))[-1] eq 30, + "standby reached the same LSN as primary"); + +# 3. Check that waiting for unreachable LSN triggers the timeout. The +# unreachable LSN must be well in advance. So WAL records issued by +# the concurrent autovacuum could not affect that. +my $lsn3 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $stderr; +$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn2}' TIMEOUT 10;"); +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${lsn3}' TIMEOUT 1000;", + stderr => \$stderr); +ok( $stderr =~ /timed out while waiting for target LSN/, + "get timeout on waiting for unreachable LSN"); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}' TIMEOUT '0.1 s' NO_THROW;]); +ok($output eq "success", + "WAIT FOR returns correct status after successful waiting"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn3}' TIMEOUT 10 NO_THROW;]); +ok($output eq "timeout", "WAIT FOR returns correct status after timeout"); + +# 4. Check that WAIT FOR triggers an error if called on primary, +# within another function, or inside a transaction with an isolation level +# higher than READ COMMITTED. + +$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';", + stderr => \$stderr); +ok( $stderr =~ /recovery is not in progress/, + "get an error when running on the primary"); + +$node_standby->psql( + 'postgres', + "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT 1; WAIT FOR LSN '${lsn3}';", + stderr => \$stderr); +ok( $stderr =~ + /WAIT FOR must be only called without an active or registered snapshot/, + "get an error when running in a transaction with an isolation level higher than REPEATABLE READ" +); + +$node_primary->safe_psql( + 'postgres', qq[ +CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$ + BEGIN + EXECUTE format('WAIT FOR LSN %L;', target_lsn); + END +\$\$ +LANGUAGE plpgsql; +]); + +$node_primary->wait_for_catchup($node_standby); +$node_standby->psql( + 'postgres', + "SELECT pg_wal_replay_wait_wrap('${lsn3}');", + stderr => \$stderr); +ok( $stderr =~ + /WAIT FOR must be only called without an active or registered snapshot/, + "get an error when running within another function"); + +# 5. Also, check the scenario of multiple LSN waiters. We make 5 background +# psql sessions each waiting for a corresponding insertion. When waiting is +# finished, stored procedures logs if there are visible as many rows as +# should be. +$node_primary->safe_psql( + 'postgres', qq[ +CREATE FUNCTION log_count(i int) RETURNS void AS \$\$ + DECLARE + count int; + BEGIN + SELECT count(*) FROM wait_test INTO count; + IF count >= 31 + i THEN + RAISE LOG 'count %', i; + END IF; + END +\$\$ +LANGUAGE plpgsql; +]); +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); +my @psql_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (${i});"); + my $lsn = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); + $psql_sessions[$i] = $node_standby->background_psql('postgres'); + $psql_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR lsn '${lsn}'; + SELECT log_count(${i}); + ]); +} +my $log_offset = -s $node_standby->logfile; +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("count ${i}", $log_offset); + $psql_sessions[$i]->quit; +} + +ok(1, 'multiple LSN waiters reported consistent data'); + +# 6. Check that the standby promotion terminates the wait on LSN. Start +# waiting for an unreachable LSN then promote. Check the log for the relevant +# error message. Also, check that waiting for already replayed LSN doesn't +# cause an error even after promotion. +my $lsn4 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $lsn5 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $psql_session = $node_standby->background_psql('postgres'); +$psql_session->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn4}'; +]); + +# Make sure standby will be promoted at least at the primary insert LSN we +# have just observed. Use pg_switch_wal() to force the insert LSN to be +# written then wait for standby to catchup. +$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$node_primary->wait_for_catchup($node_standby); + +$log_offset = -s $node_standby->logfile; +$node_standby->promote; +$node_standby->wait_for_log('recovery is not in progress', $log_offset); + +ok(1, 'got error after standby promote'); + +$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';"); + +ok(1, 'wait for already replayed LSN exits immediately even after promotion'); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn4}' TIMEOUT '10ms' NO_THROW;]); +ok($output eq "not in recovery", + "WAIT FOR returns correct status after standby promotion"); + +$node_standby->stop; +$node_primary->stop; + +# If we send \q with $psql_session->quit the command can be sent to the session +# already closed. So \q is in initial script, here we only finish IPC::Run. +$psql_session->{run}->finish; + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e5879e00dff..be191d3e2d9 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3236,7 +3236,12 @@ WaitEventIO WaitEventIPC WaitEventSet WaitEventTimeout +WaitLSNProcInfo +WaitLSNResult +WaitLSNState WaitPMResult +WaitStmt +WaitStmtParam WalCloseMethod WalCompression WalInsertClass -- 2.39.5 (Apple Git-154)