Hi, On Wed, Sep 24, 2025 at 4:59 PM Arseniy Mukhin <[email protected]> wrote: > > On Wed, Sep 24, 2025 at 1:40 AM Matheus Alcantara > <[email protected]> wrote: > > ... > > > This way the listener reads the head that includes all writer's > > > notifications and a snapshot where the writer is not in progress, so > > > nothing stops the listener from sending these notifications and it's > > > even possible to have the listener's position that is after the queue > > > head, so yes, it's bad :( Sorry about that. > > > > > Yeah, this is bad. I'm wondering if we could reproduce such race > > conditions scenarios with some TAP tests. > > > > I agree it would be great to have more tests for such cases. As for > the 'committed field' patch, I think we can add a TAP test that shows > that listeners postpone processing of notifications until > notifications were marked as 'committed=false' in case of aborted > transactions. I tried to write one, but have not succeeded yet. Hope > to finish it soon.
I finally managed to write a TAP test for it, so there is a new version with the tap test. I also realized that we can increase test coverage in 002_aborted_tx_notifies.pl if notifications of the aborted transaction span several pages. This way we can better test asyncQueueRollbackNotifications(). So I changed 002_aborted_tx_notifies.pl TAP test a bit. And there is a small indentation change in lmgr.h that should fix this git am warning. I added all changes as a separate patch file (0002) so it was more clear what changed. Please feel free to merge into the main patch file / drop any part of it that makes sense to you. Best regards, Arseniy Mukhin
From 27cb1d870a8fdcdc1cf29c26586b05c3f8819cdb Mon Sep 17 00:00:00 2001 From: Matheus Alcantara <[email protected]> Date: Sat, 6 Sep 2025 11:29:02 -0300 Subject: [PATCH v5 1/2] Make AsyncQueueEntry's self contained Previously the asyncQueueProcessPageEntries() use the TransactionIdDidCommit() to check if the transaction that a notification belongs is committed or not. Although this work for almost all scenarios we may have some cases where if a notification is keep for to long on the queue and the VACUUM FREEZE is executed during this time it may remove clog files that is needed to check the transaction status of these notifications which will cause errors to listener backends when reading the async queue. This commit fix this issue by making the AsyncQueueEntry self contained by adding the "committed" boolean field so asyncQueueProcessPageEntries() can use this to check if the transaction of the notification is committed or not. We set committed as true when adding the entry on the SLRU page buffer cache when PreCommit_Notify() is called and if an error occur before AtCommit_Notify() the AtAbort_Notify() will be called which will mark the committed field as false. We do this by remembering the QUEUE_HEAD position before the PreCommit_Notify() start adding entries on the shared queue, and if the transaction crash we iterate from this saved position until the new QUEUE_HEAD position marking the entries as not committed. Also this commit include TAP tests to exercise the VACUUM FREEZE issue and also the scenario of an error being occur between the PreCommit_Notify() and AtCommit_Notify() calls. Co-authored-by: Arseniy Mukhin <[email protected]> --- src/backend/commands/async.c | 185 +++++++++++++++++- src/backend/storage/lmgr/lmgr.c | 19 ++ src/include/storage/lmgr.h | 3 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_listen_notify/Makefile | 17 ++ .../modules/test_listen_notify/meson.build | 14 ++ .../test_listen_notify/t/001_xid_freeze.pl | 66 +++++++ .../t/002_aborted_tx_notifies.pl | 66 +++++++ src/tools/pgindent/typedefs.list | 1 + 10 files changed, 372 insertions(+), 1 deletion(-) create mode 100644 src/test/modules/test_listen_notify/Makefile create mode 100644 src/test/modules/test_listen_notify/meson.build create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl create mode 100644 src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..4e7c0e16d57 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -79,6 +79,19 @@ * are way behind and should be kicked to make them advance their * pointers). * + * The notification entries added to the queue are self-contained and + * include a "committed" field to inform listener backends if the associated + * transaction has committed. We could use the TransactionIdDidCommit() but + * if a notification remain in the queue long enough for VACUUM FREEZE to + * remove the necessary pg_xact/ file, the listener backend will face errors + * to get the transaction status. To prevent this, the "committed" field is + * set to true during PreCommit_Notify() and if the transaction aborts between + * the PreCommit_Notify() and AtCommit_Notify(), the AtAbort_Notify() is + * called to mark these entries as uncommitted. To enable this, we save the + * queue's head position before adding new entries from the in-progress to + * commit transaction. If an abort occurs, AtAbort_Notify() uses this saved + * position to find and mark the entries as uncommitted. + * * Finally, after we are out of the transaction altogether and about to go * idle, we scan the queue for messages that need to be sent to our * frontend (which might be notifies from other backends, or self-notifies @@ -142,6 +155,7 @@ #include "miscadmin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" @@ -180,6 +194,8 @@ typedef struct AsyncQueueEntry Oid dboid; /* sender's database OID */ TransactionId xid; /* sender's XID */ int32 srcPid; /* sender's PID */ + bool committed; /* Is transaction that the entry belongs + * committed? */ char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; } AsyncQueueEntry; @@ -401,8 +417,27 @@ struct NotificationHash Notification *event; /* => the actual Notification struct */ }; +/* Information needed by At_AbortNotify() to remove entries from the queue for aborted transactions. */ +typedef struct AtAbortNotifyInfo +{ + /* + * head position before the transaction start adding entries on the shared + * queue + */ + QueuePosition previousHead; + + /* + * head position after the entries from the in-progress to commit + * transaction were added. + */ + QueuePosition head; + +} AtAbortNotifyInfo; + static NotificationList *pendingNotifies = NULL; +static AtAbortNotifyInfo *atAbortInfo = NULL; + /* * Inbound notifications are initially processed by HandleNotifyInterrupt(), * called from inside a signal handler. That just sets the @@ -457,6 +492,7 @@ static void AddEventToPendingNotifies(Notification *n); static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void asyncQueueRollbackNotifications(void); /* * Compute the difference between two queue page numbers. @@ -922,6 +958,18 @@ PreCommit_Notify(void) LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock); + /* + * Before start adding entries on the shared queue, save the current + * QUEUE_HEAD so if the current in-progress to commit transaction + * abort we can mark the notifications added by this aborted + * transaction as not committed. See AtAbortt_Notify() for more info. + */ + Assert(atAbortInfo == NULL); + atAbortInfo = palloc(sizeof(AtAbortNotifyInfo)); + LWLockAcquire(NotifyQueueLock, LW_SHARED); + atAbortInfo->previousHead = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + /* Now push the notifications into the queue */ nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) @@ -948,6 +996,17 @@ PreCommit_Notify(void) LWLockRelease(NotifyQueueLock); } + /* + * Save the new QUEUE_HEAD position so if another publisher add + * entries on the shared queue and successfully commit the transaction + * we don't change the committed status of these notifications while + * marking the notification from a aborted transaction as not + * committed. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + atAbortInfo->head = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ } } @@ -1402,6 +1461,13 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Construct a valid queue entry in local variable qe */ asyncQueueNotificationToEntry(n, &qe); + /* + * Mark the entry as committed. If the transaction that this + * notification belongs fails to commit the AtAbort_Notify() will mark + * this entry as not committed. + */ + qe.committed = true; + offset = QUEUE_POS_OFFSET(queue_head); /* Check whether the entry really fits on the current page */ @@ -1678,6 +1744,16 @@ AtAbort_Notify(void) if (amRegisteredListener && listenChannels == NIL) asyncQueueUnregister(); + /* + * AtAbort_Notify information is set when we are adding entries on the + * global shared queue at PreCommit_Notify(), so in case of an abort on + * the transaction between the PreCommit_Notify() and AtCommit_Notify() we + * use this information to mark the entries from the aborted transaction + * as not committed. + */ + if (atAbortInfo != NULL) + asyncQueueRollbackNotifications(); + /* And clean up */ ClearPendingActionsAndNotifies(); } @@ -2066,7 +2142,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, reachedStop = true; break; } - else if (TransactionIdDidCommit(qe->xid)) + else if (qe->committed) { /* qe->data is the null-terminated channel name */ char *channel = qe->data; @@ -2385,6 +2461,7 @@ ClearPendingActionsAndNotifies(void) */ pendingActions = NULL; pendingNotifies = NULL; + atAbortInfo = NULL; } /* @@ -2395,3 +2472,109 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + + +/* + * Mark notifications added on an in-progress to commit transaction as not committed. + * + * Notifications added on the shared global queue are added with committed = + * true during PreCommit_Notify() call. If an error occur between the + * PreCommit_Notify() and AtCommit_Notify() the AtAbort_Notify() will be called + * and we need to mark these notifications added on the shared queue by the + * aborted transaction as not committed so that listener backends can skip + * these notifications when reading the queue. + * + * We previously rely on TransactionDidCommit() to check this but if a + * notification is keep for too long on the queue and the VACUUM FREEZE is + * executed during this period it can remove clog files that is needed to check + * the transaction status of this notification, so we make the notification + * entries self contained to skip this problem. + * + */ +static void +asyncQueueRollbackNotifications(void) +{ + QueuePosition current = atAbortInfo->previousHead; + QueuePosition head = atAbortInfo->head; + + /* + * Iterates from the position saved at the beginning of the transaction + * (previousHead) to the current head of the queue. We do this to mark all + * entries within this range as uncommitted in case of a transaction + * crash. + */ + for (;;) + { + int64 curpage = QUEUE_POS_PAGE(current); + int curoffset = QUEUE_POS_OFFSET(current); + LWLock *lock = SimpleLruGetBankLock(NotifyCtl, curpage); + int slotno; + + /* + * If we have reached the head, all entries from this transaction have + * been marked as not committed so break the loop. + */ + if (QUEUE_POS_EQUAL(current, head)) + break; + + /* + * Acquire an exclusive lock on the current SLRU page to ensure no + * other process can read or write to it while we are marking the + * entries. + */ + LWLockAcquire(lock, LW_EXCLUSIVE); + + /* Fetch the page from SLRU to mark entries as not committed. */ + slotno = SimpleLruReadPage(NotifyCtl, curpage, true, InvalidTransactionId); + + /* + * Loop through all entries on the current page. The loop will + * continue until we reach the end of the page or the current head. + */ + for (;;) + { + AsyncQueueEntry *qe; + bool reachedEndOfPage; + + /* + * Check again to stop processing the entries on the current page. + */ + if (QUEUE_POS_EQUAL(current, head)) + break; + + /* + * Get a pointer to the current entry within the shared page + * buffer. + */ + qe = (AsyncQueueEntry *) (NotifyCtl->shared->page_buffer[slotno] + curoffset); + + /* + * Just for sanity, all entries on the shared queue should be + * marked as not committed. + */ + Assert(qe->committed); + + /* Ensure that listener backends can not see these entries */ + Assert(TransactionIdIsInProgress(qe->xid)); + + /* + * Mark the entry as uncommitted so listener backends can skip + * this notification. + */ + qe->committed = false; + + /* Advance our position. */ + reachedEndOfPage = asyncQueueAdvance(¤t, qe->length); + if (reachedEndOfPage) + break; + + /* + * Update the offset for the next iteration within the same page. + */ + curoffset = QUEUE_POS_OFFSET(current); + } + + /* Release the exclusive lock on the page. */ + LWLockRelease(lock); + } +} diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 4798eb79003..12a21c51452 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -357,6 +357,25 @@ CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger) return LockHeldByMe(&tag, lockmode, orstronger); } +/* + * CheckSharedObjectLockedByMe + * + * Like CheckRelationLockedByMe, but it checks for shared objects. + */ +bool +CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode, bool orstronger) +{ + LOCKTAG tag; + + SET_LOCKTAG_OBJECT(tag, + InvalidOid, + classid, + InvalidOid, + 0); + + return LockHeldByMe(&tag, lockmode, orstronger); +} + /* * LockHasWaitersRelation * diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index b7abd18397d..c119c8f4ded 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -50,6 +50,9 @@ extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger); extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger); +extern bool CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode, + bool orstronger); + extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode); extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 902a7954101..a015c961d35 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -29,6 +29,7 @@ SUBDIRS = \ test_int128 \ test_integerset \ test_json_parser \ + test_listen_notify \ test_lfind \ test_lwlock_tranches \ test_misc \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 14fc761c4cf..6af33448d7b 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -28,6 +28,7 @@ subdir('test_ginpostinglist') subdir('test_int128') subdir('test_integerset') subdir('test_json_parser') +subdir('test_listen_notify') subdir('test_lfind') subdir('test_lwlock_tranches') subdir('test_misc') diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile new file mode 100644 index 00000000000..da1bf5bb1b7 --- /dev/null +++ b/src/test/modules/test_listen_notify/Makefile @@ -0,0 +1,17 @@ +# src/test/modules/test_listen_notify/Makefile + +MODULE = test_listen_notify +PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support" + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_listen_notify +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build new file mode 100644 index 00000000000..a68052cd353 --- /dev/null +++ b/src/test/modules/test_listen_notify/meson.build @@ -0,0 +1,14 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'test_listen_notify', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_xid_freeze.pl', + 't/002_aborted_tx_notifies.pl' + ], + }, +} + diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl new file mode 100644 index 00000000000..0a5130a042e --- /dev/null +++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl @@ -0,0 +1,66 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use File::Path qw(mkpath); +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Setup +$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound'); +$node->safe_psql('postgres', + 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,100000) g;' +); +$node->safe_psql('postgres', + 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true'); + +# --- Start Session 1 and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen s;', "Session 1 listens to 's'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# --- Session 2, multiple notify's, and commit --- +for my $i (1 .. 10) +{ + $node->safe_psql( + 'postgres', " + BEGIN; + NOTIFY s, '$i'; + COMMIT;"); +} + +# Consume enough XIDs to trigger truncation +$node->safe_psql('postgres', 'select consume_xids(10000000);'); + +# Execute update so the frozen xid of "t" table is updated to a xid greater +# than consume_xids() result +$node->safe_psql('postgres', 'UPDATE t SET a = a+b;'); + +# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced. +my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +# Execute vacuum freeze on all databases +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but +# we can still get the notification status of the notification +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced'); + +# On Session 1, commit and ensure that the all notifications is received +my $res = $psql_session1->query_safe('commit;', "commit listen s;"); +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/); +} +is($notifications_count, 10, 'received all committed notifications'); + +done_testing(); diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl new file mode 100644 index 00000000000..17fcb4b786e --- /dev/null +++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl @@ -0,0 +1,66 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use File::Path qw(mkpath); +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Test checks that listeners do not receive notifications from aborted +# transaction even if notifications have been added to the listen/notify +# queue. To reproduce it we use the fact that serializable conflicts +# are checked after tx adds notifications to the queue. + +# Setup +$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);'); + +# Listener +my $psql_listener = $node->background_psql('postgres'); +$psql_listener->query_safe('LISTEN ch;'); + +# Session1. Start SERIALIZABLE tx and add a notification. +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe(" + BEGIN ISOLATION LEVEL SERIALIZABLE; + SELECT * FROM t1; + INSERT INTO t1 DEFAULT VALUES; + NOTIFY ch,'committed'; +"); + +# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict +# with session1. +my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0); +$psql_session2->query_safe(" + BEGIN ISOLATION LEVEL SERIALIZABLE; + SELECT * FROM t1; + INSERT INTO t1 DEFAULT VALUES; + NOTIFY ch,'aborted'; +"); + +# Session1 should be committed successfully. Listeners must receive session1 +# notifications. +$psql_session1->query_safe("COMMIT;"); + +# Session2 should be aborted due to the conflict with session1. Transaction +# is aborted after adding notifications to the listen/notify queue, but +# listeners should not receive session2 notifications. +$psql_session2->query("COMMIT;"); + +# send another notification after aborted +$node->safe_psql('postgres', "NOTIFY ch, 'next_committed';"); + +# fetch notifications +my $res = $psql_listener->query_safe('begin; commit;'); + +# check received notifications +my @lines = split('\n', $res); +is(@lines, 2, 'received all committed notifications'); +like($lines[0], qr/Asynchronous notification "ch" with payload "committed" received/); +like($lines[1], qr/Asynchronous notification "ch" with payload "next_committed" received/); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e90af5b2ad3..e1c2384aa3d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -159,6 +159,7 @@ ArrayType AsyncQueueControl AsyncQueueEntry AsyncRequest +AtAbortNotifyInfo AttInMetadata AttStatsSlot AttoptCacheEntry -- 2.43.0
From 00b372089d9c01c66b5e919187238ffb6ce45f0d Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin <[email protected]> Date: Fri, 26 Sep 2025 13:17:59 +0300 Subject: [PATCH v5 2/2] tap tests --- src/backend/commands/async.c | 5 + src/include/storage/lmgr.h | 2 +- .../modules/test_listen_notify/meson.build | 6 +- .../t/002_aborted_tx_notifies.pl | 32 +++-- ...tpone_in_progress_aborted_notifications.pl | 114 ++++++++++++++++++ 5 files changed, 146 insertions(+), 13 deletions(-) create mode 100644 src/test/modules/test_listen_notify/t/003_postpone_in_progress_aborted_notifications.pl diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e7c0e16d57..f6f8fa6134c 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -164,6 +164,7 @@ #include "utils/ps_status.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "utils/injection_point.h" /* @@ -2138,6 +2139,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * because our transaction cannot (yet) have queued any * messages. */ + + INJECTION_POINT("listen-notify-in-progress-notification", NULL); *current = thisentry; reachedStop = true; break; @@ -2497,6 +2500,8 @@ asyncQueueRollbackNotifications(void) QueuePosition current = atAbortInfo->previousHead; QueuePosition head = atAbortInfo->head; + INJECTION_POINT("listen-notify-notifications-rollback", NULL); + /* * Iterates from the position saved at the beginning of the transaction * (previousHead) to the current head of the queue. We do this to mark all diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index c119c8f4ded..3d34e61772d 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -51,7 +51,7 @@ extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger); extern bool CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode, - bool orstronger); + bool orstronger); extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode); diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build index a68052cd353..565f4ec8ef0 100644 --- a/src/test/modules/test_listen_notify/meson.build +++ b/src/test/modules/test_listen_notify/meson.build @@ -5,9 +5,13 @@ tests += { 'sd': meson.current_source_dir(), 'bd': meson.current_build_dir(), 'tap': { + 'env': { + 'enable_injection_points': get_option('injection_points') ? 'yes' : 'no', + }, 'tests': [ 't/001_xid_freeze.pl', - 't/002_aborted_tx_notifies.pl' + 't/002_aborted_tx_notifies.pl', + 't/003_postpone_in_progress_aborted_notifications.pl' ], }, } diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl index 17fcb4b786e..74c4ae9fa9d 100644 --- a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl +++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl @@ -7,15 +7,15 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -my $node = PostgreSQL::Test::Cluster->new('node'); -$node->init; -$node->start; - # Test checks that listeners do not receive notifications from aborted # transaction even if notifications have been added to the listen/notify # queue. To reproduce it we use the fact that serializable conflicts # are checked after tx adds notifications to the queue. +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + # Setup $node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);'); @@ -29,7 +29,8 @@ $psql_session1->query_safe(" BEGIN ISOLATION LEVEL SERIALIZABLE; SELECT * FROM t1; INSERT INTO t1 DEFAULT VALUES; - NOTIFY ch,'committed'; + NOTIFY ch,'committed_0'; + NOTIFY ch,'committed_1'; "); # Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict @@ -39,9 +40,15 @@ $psql_session2->query_safe(" BEGIN ISOLATION LEVEL SERIALIZABLE; SELECT * FROM t1; INSERT INTO t1 DEFAULT VALUES; - NOTIFY ch,'aborted'; "); +# Send notifications that should not be eventually delivered, as session2 +# transaction will be aborted. +my $message = 'aborted_' . 'a' x 1000; +for (my $i = 0; $i < 10; $i++) { + $psql_session2->query_safe("NOTIFY ch, '$i$message'"); +} + # Session1 should be committed successfully. Listeners must receive session1 # notifications. $psql_session1->query_safe("COMMIT;"); @@ -51,16 +58,19 @@ $psql_session1->query_safe("COMMIT;"); # listeners should not receive session2 notifications. $psql_session2->query("COMMIT;"); -# send another notification after aborted -$node->safe_psql('postgres', "NOTIFY ch, 'next_committed';"); +# send more notifications after aborted +$node->safe_psql('postgres', "NOTIFY ch, 'committed_2';"); +$node->safe_psql('postgres', "NOTIFY ch, 'committed_3';"); # fetch notifications my $res = $psql_listener->query_safe('begin; commit;'); # check received notifications my @lines = split('\n', $res); -is(@lines, 2, 'received all committed notifications'); -like($lines[0], qr/Asynchronous notification "ch" with payload "committed" received/); -like($lines[1], qr/Asynchronous notification "ch" with payload "next_committed" received/); +is(@lines, 4, 'received all committed notifications'); +for (my $i = 0; $i < 4; $i++) { + like($lines[$i], qr/Asynchronous notification "ch" with payload "committed_$i" received/); +} + done_testing(); diff --git a/src/test/modules/test_listen_notify/t/003_postpone_in_progress_aborted_notifications.pl b/src/test/modules/test_listen_notify/t/003_postpone_in_progress_aborted_notifications.pl new file mode 100644 index 00000000000..b53e6a1cdaa --- /dev/null +++ b/src/test/modules/test_listen_notify/t/003_postpone_in_progress_aborted_notifications.pl @@ -0,0 +1,114 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use File::Path qw(mkpath); +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test checks that listeners do not receive notifications from aborted +# even if the see such notifications in the queue right before the moment +# aborted transaction marks its notifications as 'committed = false'. To +# reproduce it we use the fact that serializable conflicts are checked after +# tx adds notifications to the queue. + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +$node->safe_psql('postgres', 'CREATE EXTENSION injection_points'); + +# Setup +$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);'); + +# Injection points setup +$node->safe_psql('postgres',"SELECT injection_points_attach('listen-notify-notifications-rollback', 'wait')"); +$node->safe_psql('postgres',"SELECT injection_points_attach('listen-notify-in-progress-notification', 'notice')"); + +# Session1. Start SERIALIZABLE tx and add a notification. +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe(" + BEGIN ISOLATION LEVEL SERIALIZABLE; + SELECT * FROM t1; + INSERT INTO t1 DEFAULT VALUES; + NOTIFY ch,'committed'; +"); + +# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict +# with session1. +my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0); +$psql_session2->query_safe(" + BEGIN ISOLATION LEVEL SERIALIZABLE; + SELECT * FROM t1; + INSERT INTO t1 DEFAULT VALUES; + NOTIFY ch,'aborted'; +"); + +# Session1 should be committed successfully and publish notification. +$psql_session1->query_safe("COMMIT;"); + +# Session2 should be aborted due to the conflict with session1. Transaction +# is aborted after adding notifications to the listen/notify queue, so we have notification of +# aborted transaction in the queue. Session2 should start sleeping on injection point right before +# marking its notifications as 'committed = false' +$psql_session2->query_until( + qr/start/, q( + \echo start + COMMIT; +)); + +$node->wait_for_event('client backend', 'listen-notify-notifications-rollback'); + +# Setup listener +my $psql_listener = $node->background_psql('postgres'); +$psql_listener->query_safe("SET log_min_messages = 'NOTICE'"); + +# At the moment listener should skip the first committed notification (as it was committed before we started listening) +# and stop on the first aborted notification as notify transaction is still in progress and backend is sleeping. +my $log_offset = -s $node->logfile; +my $res = $psql_listener->query('LISTEN ch;'); + +# Check that we touched session2's pending notification and triggered injection point +$node->wait_for_log(qr/notice triggered for injection point listen-notify-in-progress-notification/, $log_offset); + +# Check listener has no notifications +my @lines = split('\n', $res); +is(@lines, 0, 'received no notifications'); + +# Wakeup backend with aborted transaction +$node->safe_psql('postgres',"SELECT injection_points_wakeup('listen-notify-notifications-rollback');"); + +# Send another notification after aborted +$node->safe_psql('postgres', "NOTIFY ch, 'next_committed'"); + +# Clean stderr as workaround of the current bug with background psql hanging +$psql_listener->{stderr} = ""; + +# Fetch the rest notifications +$res = $psql_listener->query_safe('begin; commit;'); + + +# Now aborted transaction is completed, so listener must skip aborted notifications and get next_committed +@lines = split('\n', $res); +is(@lines, 1, 'received all committed notifications'); +like($lines[0], qr/Asynchronous notification "ch" with payload "next_committed" received/); + +# Injection points cleanup +$node->safe_psql('postgres',"SELECT injection_points_detach('listen-notify-notifications-rollback');"); +$node->safe_psql('postgres',"SELECT injection_points_detach('listen-notify-in-progress-notification');"); + +done_testing(); -- 2.43.0
