This is another version of the patch. It now includes tests.

On Fri, Nov 1, 2024 at 1:42 PM Torsten Förtsch <tfoertsch...@gmail.com>
wrote:

> On Wed, Oct 30, 2024 at 9:01 AM Peter Eisentraut <pe...@eisentraut.org>
> wrote:
>
>> On 27.10.24 13:37, Torsten Förtsch wrote:
>> > The attached patch enables pg_recvlogical to create a temporary slot.
>>
>> I think you should explain a bit why you want to do that, what use case
>> or scenario this is meant to address.
>>
>
> In my particular case I want to filter for messages sent by
> pg_logical_emit_message(). I don't care much about getting ALL the changes.
> If the replication slot disappears and a few changes are lost it does not
> matter. So, a temporary rep slot makes more sense than creating one and
> then having to make sure it is not retaining wal forever later.
>
> I can imagine this also as a tool to monitor changes for a while and then
> simply disconnect without the need to remove the slot.
>
> Why am I interested in pg_logical_emit_message()? We have an application
> with relatively complicated transactions involving multiple tables. Some of
> them use pg_notify(). We also have synchronous replication. Sometimes I see
> lock avalanches that can be traced to the "AccessExclusiveLock on object 0
> of class 1262 of database 0". This lock is taken during commit when the
> notification is inserted in the queue. After that the backend waits for the
> confirmation by the sync replica. So, this lock presses all the
> transactions sending notifications into a sequence.
>
> Now, if the application uses pg_logical_emit_message() instead, then I
> think there is no equivalent lock. I understand the semantics are a bit
> different (timing) but close enough for my use case. Another advantage of
> pg_logical_emit_message() is the ability to send them even if the
> transaction is aborted.
>
> I was in the process of experimenting with this idea and found that
> pg_recvlogical can:
> - only create the slot or
> - create the slot and immediately use it
> - try to create the slot and if the slot is already there use it
>
> So, why not also allow it to create a temporary slot?
>
From 82d6256fb621e2dc75a67603bef5b511cfdf0e27 Mon Sep 17 00:00:00 2001
From: Torsten Foertsch <tfoertsch...@gmail.com>
Date: Fri, 1 Nov 2024 13:56:43 +0100
Subject: [PATCH v2] Allow pg_recvlogical to create temp slots

With this patch pg_recvlogical can be called with the
--temporary-slot option together with --create-slot and --start.

If called that way, the created slot exists only for the duration
of the connection. If the connection is dropped and reestablished
by pg_recvlogical, a new temp slot by the same name is created.

If the slot exists and --if-not-exists is not passed, then
pg_recvlogical fails. If the slot exists and --if-not-exists is
given, the slot will be used.

In addition a few tests have been added for previously untested
options.

Discussion: https://www.postgresql.org/message-id/CAKkG4_%3DoMpa-AXhw9m044ZH5YdneNFTp6WxG_kEPA0cTkfiMNQ%40mail.gmail.com
---
 src/bin/pg_basebackup/pg_recvlogical.c        | 33 ++++++++--
 src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 65 +++++++++++++++++++
 2 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 3db520ed38..4a131cca6d 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -50,6 +50,7 @@ static int	fsync_interval = 10 * 1000; /* 10 sec = default */
 static XLogRecPtr startpos = InvalidXLogRecPtr;
 static XLogRecPtr endpos = InvalidXLogRecPtr;
 static bool do_create_slot = false;
+static bool slot_is_temporary = false;
 static bool slot_exists_ok = false;
 static bool do_start_slot = false;
 static bool do_drop_slot = false;
@@ -104,6 +105,7 @@ usage(void)
 	printf(_("  -s, --status-interval=SECS\n"
 			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
 	printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
+	printf(_("      --temporary-slot   the slot created exists until the connection is dropped\n"));
 	printf(_("  -t, --two-phase        enable decoding of prepared transactions when creating a slot\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
@@ -216,7 +218,7 @@ StreamLogicalLog(void)
 	char	   *copybuf = NULL;
 	TimestampTz last_status = -1;
 	int			i;
-	PQExpBuffer query;
+	PQExpBuffer query = NULL;
 	XLogRecPtr	cur_record_lsn;
 
 	output_written_lsn = InvalidXLogRecPtr;
@@ -227,10 +229,24 @@ StreamLogicalLog(void)
 	 * Connect in replication mode to the server
 	 */
 	if (!conn)
+	{
 		conn = GetConnection();
-	if (!conn)
-		/* Error message already written in GetConnection() */
-		return;
+		if (!conn)
+			/* Error message already written in GetConnection() */
+			return;
+
+		/* Recreate a replication slot. */
+		if (do_create_slot && slot_is_temporary)
+		{
+			if (verbose)
+				pg_log_info("recreating replication slot \"%s\"", replication_slot);
+
+			if (!CreateReplicationSlot(conn, replication_slot, plugin, slot_is_temporary,
+									   false, false, slot_exists_ok, two_phase))
+				goto error;
+			startpos = InvalidXLogRecPtr;
+		}
+	}
 
 	/*
 	 * Start the replication
@@ -656,7 +672,8 @@ error:
 		PQfreemem(copybuf);
 		copybuf = NULL;
 	}
-	destroyPQExpBuffer(query);
+	if (query != NULL)
+		destroyPQExpBuffer(query);
 	PQfinish(conn);
 	conn = NULL;
 }
@@ -719,6 +736,7 @@ main(int argc, char **argv)
 		{"start", no_argument, NULL, 2},
 		{"drop-slot", no_argument, NULL, 3},
 		{"if-not-exists", no_argument, NULL, 4},
+		{"temporary-slot", no_argument, NULL, 5},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -847,6 +865,9 @@ main(int argc, char **argv)
 			case 4:
 				slot_exists_ok = true;
 				break;
+			case 5:
+				slot_is_temporary = true;
+				break;
 
 			default:
 				/* getopt_long already emitted a complaint */
@@ -981,7 +1002,7 @@ main(int argc, char **argv)
 		if (verbose)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
-		if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
+		if (!CreateReplicationSlot(conn, replication_slot, plugin, slot_is_temporary,
 								   false, false, slot_exists_ok, two_phase))
 			exit(1);
 		startpos = InvalidXLogRecPtr;
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
index 8432e5660d..0b46574fc1 100644
--- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -65,6 +65,23 @@ $node->command_ok(
 	],
 	'replayed a transaction');
 
+$node->command_fails(
+	[
+		'pg_recvlogical', '-S',
+		'test', '-d',
+		$node->connstr('postgres'), '--create-slot'
+	],
+	'slot cannot be created again');
+
+$node->command_ok(
+	[
+		'pg_recvlogical', '-S',
+		'test', '-d',
+		$node->connstr('postgres'), '--create-slot',
+                '--if-not-exists'
+	],
+	'if-not-exists');
+
 $node->command_ok(
 	[
 		'pg_recvlogical', '-S',
@@ -73,6 +90,54 @@ $node->command_ok(
 	],
 	'slot dropped');
 
+# slot() returns a hash with all values set to the empty string if the
+# slot does not exist.
+$slot = $node->slot('test');
+is(0+grep({$_ ne ''} values %$slot), 0, 'slot does not exist anymore');
+
+$node->command_ok(
+	[
+		'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+		'--create-slot', '--temporary-slot',
+		'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+	],
+	'create temporary slot');
+
+$slot = $node->slot('test');
+is(0+grep({$_ ne ''} values %$slot), 0, 'temp slot is gone when connection is dropped');
+
+$node->command_ok(
+	[
+		'pg_recvlogical', '-S',
+		'test', '-d',
+		$node->connstr('postgres'), '--create-slot',
+	],
+	'create slot again');
+
+$node->command_fails(
+	[
+		'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+		'--create-slot', '--temporary-slot',
+		'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+	],
+	'create temporary slot fails if target slot exists');
+
+$node->command_ok(
+	[
+		'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+		'--create-slot', '--temporary-slot', '--if-not-exists',
+		'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+	],
+	'create temporary with --if-not-exists');
+
+$node->command_ok(
+	[
+		'pg_recvlogical', '-S',
+		'test', '-d',
+		$node->connstr('postgres'), '--drop-slot'
+	],
+	'slot dropped again');
+
 #test with two-phase option enabled
 $node->command_ok(
 	[
-- 
2.34.1

Reply via email to