This is another version of the patch. It now includes tests.
On Fri, Nov 1, 2024 at 1:42 PM Torsten Förtsch <tfoertsch...@gmail.com> wrote: > On Wed, Oct 30, 2024 at 9:01 AM Peter Eisentraut <pe...@eisentraut.org> > wrote: > >> On 27.10.24 13:37, Torsten Förtsch wrote: >> > The attached patch enables pg_recvlogical to create a temporary slot. >> >> I think you should explain a bit why you want to do that, what use case >> or scenario this is meant to address. >> > > In my particular case I want to filter for messages sent by > pg_logical_emit_message(). I don't care much about getting ALL the changes. > If the replication slot disappears and a few changes are lost it does not > matter. So, a temporary rep slot makes more sense than creating one and > then having to make sure it is not retaining wal forever later. > > I can imagine this also as a tool to monitor changes for a while and then > simply disconnect without the need to remove the slot. > > Why am I interested in pg_logical_emit_message()? We have an application > with relatively complicated transactions involving multiple tables. Some of > them use pg_notify(). We also have synchronous replication. Sometimes I see > lock avalanches that can be traced to the "AccessExclusiveLock on object 0 > of class 1262 of database 0". This lock is taken during commit when the > notification is inserted in the queue. After that the backend waits for the > confirmation by the sync replica. So, this lock presses all the > transactions sending notifications into a sequence. > > Now, if the application uses pg_logical_emit_message() instead, then I > think there is no equivalent lock. I understand the semantics are a bit > different (timing) but close enough for my use case. Another advantage of > pg_logical_emit_message() is the ability to send them even if the > transaction is aborted. > > I was in the process of experimenting with this idea and found that > pg_recvlogical can: > - only create the slot or > - create the slot and immediately use it > - try to create the slot and if the slot is already there use it > > So, why not also allow it to create a temporary slot? >
From 82d6256fb621e2dc75a67603bef5b511cfdf0e27 Mon Sep 17 00:00:00 2001 From: Torsten Foertsch <tfoertsch...@gmail.com> Date: Fri, 1 Nov 2024 13:56:43 +0100 Subject: [PATCH v2] Allow pg_recvlogical to create temp slots With this patch pg_recvlogical can be called with the --temporary-slot option together with --create-slot and --start. If called that way, the created slot exists only for the duration of the connection. If the connection is dropped and reestablished by pg_recvlogical, a new temp slot by the same name is created. If the slot exists and --if-not-exists is not passed, then pg_recvlogical fails. If the slot exists and --if-not-exists is given, the slot will be used. In addition a few tests have been added for previously untested options. Discussion: https://www.postgresql.org/message-id/CAKkG4_%3DoMpa-AXhw9m044ZH5YdneNFTp6WxG_kEPA0cTkfiMNQ%40mail.gmail.com --- src/bin/pg_basebackup/pg_recvlogical.c | 33 ++++++++-- src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 65 +++++++++++++++++++ 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 3db520ed38..4a131cca6d 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -50,6 +50,7 @@ static int fsync_interval = 10 * 1000; /* 10 sec = default */ static XLogRecPtr startpos = InvalidXLogRecPtr; static XLogRecPtr endpos = InvalidXLogRecPtr; static bool do_create_slot = false; +static bool slot_is_temporary = false; static bool slot_exists_ok = false; static bool do_start_slot = false; static bool do_drop_slot = false; @@ -104,6 +105,7 @@ usage(void) printf(_(" -s, --status-interval=SECS\n" " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n")); + printf(_(" --temporary-slot the slot created exists until the connection is dropped\n")); printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); @@ -216,7 +218,7 @@ StreamLogicalLog(void) char *copybuf = NULL; TimestampTz last_status = -1; int i; - PQExpBuffer query; + PQExpBuffer query = NULL; XLogRecPtr cur_record_lsn; output_written_lsn = InvalidXLogRecPtr; @@ -227,10 +229,24 @@ StreamLogicalLog(void) * Connect in replication mode to the server */ if (!conn) + { conn = GetConnection(); - if (!conn) - /* Error message already written in GetConnection() */ - return; + if (!conn) + /* Error message already written in GetConnection() */ + return; + + /* Recreate a replication slot. */ + if (do_create_slot && slot_is_temporary) + { + if (verbose) + pg_log_info("recreating replication slot \"%s\"", replication_slot); + + if (!CreateReplicationSlot(conn, replication_slot, plugin, slot_is_temporary, + false, false, slot_exists_ok, two_phase)) + goto error; + startpos = InvalidXLogRecPtr; + } + } /* * Start the replication @@ -656,7 +672,8 @@ error: PQfreemem(copybuf); copybuf = NULL; } - destroyPQExpBuffer(query); + if (query != NULL) + destroyPQExpBuffer(query); PQfinish(conn); conn = NULL; } @@ -719,6 +736,7 @@ main(int argc, char **argv) {"start", no_argument, NULL, 2}, {"drop-slot", no_argument, NULL, 3}, {"if-not-exists", no_argument, NULL, 4}, + {"temporary-slot", no_argument, NULL, 5}, {NULL, 0, NULL, 0} }; int c; @@ -847,6 +865,9 @@ main(int argc, char **argv) case 4: slot_exists_ok = true; break; + case 5: + slot_is_temporary = true; + break; default: /* getopt_long already emitted a complaint */ @@ -981,7 +1002,7 @@ main(int argc, char **argv) if (verbose) pg_log_info("creating replication slot \"%s\"", replication_slot); - if (!CreateReplicationSlot(conn, replication_slot, plugin, false, + if (!CreateReplicationSlot(conn, replication_slot, plugin, slot_is_temporary, false, false, slot_exists_ok, two_phase)) exit(1); startpos = InvalidXLogRecPtr; diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl index 8432e5660d..0b46574fc1 100644 --- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl +++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl @@ -65,6 +65,23 @@ $node->command_ok( ], 'replayed a transaction'); +$node->command_fails( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot' + ], + 'slot cannot be created again'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot', + '--if-not-exists' + ], + 'if-not-exists'); + $node->command_ok( [ 'pg_recvlogical', '-S', @@ -73,6 +90,54 @@ $node->command_ok( ], 'slot dropped'); +# slot() returns a hash with all values set to the empty string if the +# slot does not exist. +$slot = $node->slot('test'); +is(0+grep({$_ ne ''} values %$slot), 0, 'slot does not exist anymore'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--create-slot', '--temporary-slot', + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'create temporary slot'); + +$slot = $node->slot('test'); +is(0+grep({$_ ne ''} values %$slot), 0, 'temp slot is gone when connection is dropped'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot', + ], + 'create slot again'); + +$node->command_fails( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--create-slot', '--temporary-slot', + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'create temporary slot fails if target slot exists'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--create-slot', '--temporary-slot', '--if-not-exists', + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'create temporary with --if-not-exists'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--drop-slot' + ], + 'slot dropped again'); + #test with two-phase option enabled $node->command_ok( [ -- 2.34.1