Hi,

On 4/24/23 8:24 AM, Amit Kapila wrote:
On Mon, Apr 24, 2023 at 11:24 AM Drouvot, Bertrand
<bertranddrouvot...@gmail.com> wrote:


Few comments:
============

Thanks for looking at it!

1.
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'max_replication_slots = 4');

Why do we need slots on the subscriber?


Good point, it's not needed. I guess it has been missed during my initial patch 
clean up.

Fixed in V3 attached.

2.
+# Speed up the subscription creation
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+
+# Explicitly shut down psql instance gracefully - to avoid hangs
+# or worse on windows
+$psql_subscriber{subscriber_stdin} .= "\\q\n";
+$psql_subscriber{run}->finish;
+
+# Insert some rows on the primary
+$node_primary->safe_psql('postgres',
+ qq[INSERT INTO tab_rep select generate_series(1,10);]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# To speed up the wait_for_subscription_sync
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');

It is not clear to me why you need to do pg_log_standby_snapshot() twice.

That's because there is 2 logical slot creations that have the be done on the 
standby.

The one for the subscription:

"
CREATE_REPLICATION_SLOT "tap_sub" LOGICAL pgoutput (SNAPSHOT 'nothing')
"

And the one for the data sync:

"
CREATE_REPLICATION_SLOT "pg_16389_sync_16384_7225540800768250444" LOGICAL 
pgoutput (SNAPSHOT 'use')
"

Without the second "pg_log_standby_snapshot()" then 
wait_for_subscription_sync() would be waiting
some time on the poll for "SELECT count(1) = 0 FROM pg_subscription_rel WHERE 
srsubstate NOT IN ('r', 's');"

Adding a comment in V3 to explain the need for the second 
pg_log_standby_snapshot().


3. Why do you need $psql_subscriber to be used in a different way
instead of using safe_psql as is used for node_primary?


Because safe_psql() would wait for activity on the primary without being able 
to launch
pg_log_standby_snapshot() on the primary while waiting. psql_subscriber() allows
to not wait synchronously.

Also adding a comment in V3 to explain why safe_psql() is not being used here.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From f74fb99ff67e47bb78bdff62bdf1ac824a8fb24b Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 24 Apr 2023 05:13:23 +0000
Subject: [PATCH v3 2/2] Add two tests in 035_standby_logical_decoding.pl

Adding two tests, to verify that:

- subscribtion to the standby is possible
- invalidated logical slots do not lead to retaining WAL
---
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  11 +-
 .../t/035_standby_logical_decoding.pl         | 167 +++++++++++++++++-
 2 files changed, 173 insertions(+), 5 deletions(-)
   4.4% src/test/perl/PostgreSQL/Test/
  95.5% src/test/recovery/t/

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm 
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 6f7f4e5de4..819667d42a 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2644,7 +2644,16 @@ sub wait_for_catchup
        }
        if (!defined($target_lsn))
        {
-               $target_lsn = $self->lsn('write');
+               my $isrecovery = $self->safe_psql('postgres', "SELECT 
pg_is_in_recovery()");
+               chomp($isrecovery);
+               if ($isrecovery eq 't')
+               {
+                       $target_lsn = $self->lsn('replay');
+               }
+               else
+               {
+                       $target_lsn = $self->lsn('write');
+               }
        }
        print "Waiting for replication conn "
          . $standby_name . "'s "
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
index b8f5311fe9..647e203b0f 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -9,13 +9,19 @@ use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use Time::HiRes qw(usleep);
 
-my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, 
$handle, $slot);
+my ($stdin,             $stdout,            $stderr,
+       $cascading_stdout,  $cascading_stderr,  $subscriber_stdin,
+       $subscriber_stdout, $subscriber_stderr, $ret,
+       $handle,            $slot);
 
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
 my $node_standby = PostgreSQL::Test::Cluster->new('standby');
 my $node_cascading_standby = 
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $psql_timeout    = IPC::Run::timer(2 * $default_timeout);
 my $res;
 
 # Name for the physical slot on primary
@@ -267,7 +273,8 @@ $node_standby->init_from_backup(
        has_streaming => 1,
        has_restoring => 1);
 $node_standby->append_conf('postgresql.conf',
-       qq[primary_slot_name = '$primary_slotname']);
+       qq[primary_slot_name = '$primary_slotname'
+       max_replication_slots = 5]);
 $node_standby->start;
 $node_primary->wait_for_replay_catchup($node_standby);
 $node_standby->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$standby_physical_slotname');]);
@@ -285,6 +292,26 @@ $node_cascading_standby->append_conf('postgresql.conf',
 $node_cascading_standby->start;
 $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
 
+#######################
+# Initialize subscriber node
+#######################
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my %psql_subscriber = (
+       'subscriber_stdin'  => '',
+       'subscriber_stdout' => '',
+       'subscriber_stderr' => '');
+$psql_subscriber{run} = IPC::Run::start(
+       [ 'psql', '-XA', '-f', '-', '-d', $node_subscriber->connstr('postgres') 
],
+       '<',
+       \$psql_subscriber{subscriber_stdin},
+       '>',
+       \$psql_subscriber{subscriber_stdout},
+       '2>',
+       \$psql_subscriber{subscriber_stderr},
+       $psql_timeout);
+
 ##################################################
 # Test that logical decoding on the standby
 # behaves correctly.
@@ -365,6 +392,65 @@ is( $node_primary->psql(
     3,
     'replaying logical slot from another database fails');
 
+##################################################
+# Test that we can subscribe on the standby with the publication
+# created on the primary.
+##################################################
+
+# Create a table on the primary
+$node_primary->safe_psql('postgres',
+       "CREATE TABLE tab_rep (a int primary key)");
+
+# Create a table (same structure) on the subscriber node
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab_rep (a int primary key)");
+
+# Create a publication on the primary
+$node_primary->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub for table tab_rep");
+
+# Subscribe on the standby
+my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
+
+# Not using safe_psql() here as it would wait for activity on the primary
+# without being able to launch pg_log_standby_snapshot() on the primary while
+# waiting.
+# psql_subscriber() allows to not wait synchronously.
+$psql_subscriber{subscriber_stdin} .=
+  qq[CREATE SUBSCRIPTION tap_sub CONNECTION '$standby_connstr' PUBLICATION 
tap_pub;];
+$psql_subscriber{subscriber_stdin} .= "\n";
+
+$psql_subscriber{run}->pump_nb();
+
+# Speed up the subscription creation
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+
+# Explicitly shut down psql instance gracefully - to avoid hangs
+# or worse on windows
+$psql_subscriber{subscriber_stdin} .= "\\q\n";
+$psql_subscriber{run}->finish;
+
+# Insert some rows on the primary
+$node_primary->safe_psql('postgres',
+       qq[INSERT INTO tab_rep select generate_series(1,10);]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# To speed up the wait_for_subscription_sync
+# Indeed a second logical slot (pg_%sync%: exact name coming from 
ReplicationSlotNameForTablesync())
+# is created under the hood on the standby for the data sync
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
+
+# Check that the subscriber can see the rows inserted in the primary
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(10), 'check replicated inserts after subscription on standby');
+
+# We do not need the subscription and the subscriber anymore
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->stop;
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 1: hot_standby_feedback off and vacuum FULL
@@ -408,9 +494,82 @@ $node_standby->restart;
 check_slots_conflicting_status(1);
 
 ##################################################
-# Verify that invalidated logical slots do not lead to retaining WAL
+# Verify that invalidated logical slots do not lead to retaining WAL.
 ##################################################
-# XXXXX TODO
+
+# Get the restart_lsn from an invalidated slot
+my $restart_lsn = $node_standby->safe_psql('postgres',
+       "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 
'vacuum_full_activeslot' and conflicting is true;"
+);
+
+chomp($restart_lsn);
+
+# Get the WAL file name associated to this lsn on the primary
+my $walfile_name = $node_primary->safe_psql('postgres',
+       "SELECT pg_walfile_name('$restart_lsn')");
+
+chomp($walfile_name);
+
+# Check the WAL file is still on the primary
+ok(-f $node_primary->data_dir . '/pg_wal/' . $walfile_name,
+       "WAL file still on the primary");
+
+# Get the number of WAL files on the standby
+my $nb_standby_files = $node_standby->safe_psql('postgres',
+       "SELECT COUNT(*) FROM pg_ls_dir('pg_wal')");
+
+chomp($nb_standby_files);
+
+# Switch WAL files on the primary
+my @c = (1 .. $nb_standby_files);
+
+$node_primary->safe_psql('postgres', "create table retain_test(a int)");
+
+for (@c)
+{
+       $node_primary->safe_psql(
+               'postgres', "SELECT pg_switch_wal();
+                                          insert into retain_test values("
+                 . $_ . ");");
+}
+
+# Ask for a checkpoint
+$node_primary->safe_psql('postgres', 'checkpoint;');
+
+# Check that the WAL file has not been retained on the primary
+ok(!-f $node_primary->data_dir . '/pg_wal/' . $walfile_name,
+       "WAL file not on the primary anymore");
+
+# Wait for the standby to catch up
+$node_primary->wait_for_catchup($node_standby);
+
+# Generate another WAL switch, more activity and a checkpoint
+$node_primary->safe_psql(
+       'postgres', "SELECT pg_switch_wal();
+                                         insert into retain_test values(1);");
+$node_primary->safe_psql('postgres', 'checkpoint;');
+
+# Wait for the standby to catch up
+$node_primary->wait_for_catchup($node_standby);
+
+# Verify that the wal file has not been retained on the standby
+my $standby_walfile = $node_standby->data_dir . '/pg_wal/' . $walfile_name;
+
+# We can not test if the WAL file still exists immediately.
+# We need to let some time to the standby to actually "remove" it.
+my $i = 0;
+while (1)
+{
+       last if !-f $standby_walfile;
+       if ($i++ == 10 * $default_timeout)
+       {
+               die
+                 "could not determine if WAL file has been retained or not, 
can't continue";
+       }
+       usleep(100_000);
+}
+
+ok(1, "invalidated logical slots do not lead to retaining WAL");
 
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
-- 
2.34.1

From dbbe19e8bec4e664f8e2d7093208bed1e6f57e31 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 24 Apr 2023 05:10:53 +0000
Subject: [PATCH v3 1/2] Lower log level in 035_standby_logical_decoding.pl

Lower log level in 035_standby_logical_decoding.pl to shrink the output size
and speed up the test a bit.
---
 src/test/recovery/t/035_standby_logical_decoding.pl | 2 --
 1 file changed, 2 deletions(-)
 100.0% src/test/recovery/t/

diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
index 20838d49b5..b8f5311fe9 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -235,8 +235,6 @@ $node_primary->append_conf('postgresql.conf', q{
 wal_level = 'logical'
 max_replication_slots = 4
 max_wal_senders = 4
-log_min_messages = 'debug2'
-log_error_verbosity = verbose
 });
 $node_primary->dump_info;
 $node_primary->start;
-- 
2.34.1

Reply via email to