On Tue, Jul 26, 2022 at 2:01 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Jul 26, 2022 at 7:07 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:
> >
> > Hi,
> >
> > In tap tests for logical replication, we have the following code in many 
> > places:
> >
> > $node_publisher->wait_for_catchup('tap_sub');
> > my $synced_query =
> >   "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT
> > IN ('r', 's');";
> > $node_subscriber->poll_query_until('postgres', $synced_query)
> >   or die "Timed out while waiting for subscriber to synchronize data";
> >
> > Also, we sometime forgot to check either one, like we fixed in commit
> > 1f50918a6fb02207d151e7cb4aae4c36de9d827c.
> >
> > I think we can have a new function to wait for all subscriptions to
> > synchronize data. The attached patch introduce a new function
> > wait_for_subscription_sync(). With this function, we can replace the
> > above code with this one function as follows:
> >
> > $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
> >
>
> +1. This reduces quite some code in various tests and will make it
> easier to write future tests.
>
> Few comments/questions:
> ====================
> 1.
> -$node_publisher->wait_for_catchup('mysub1');
> -
> -# Also wait for initial table sync to finish.
> -my $synced_query =
> -  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT
> IN ('r', 's');";
> -$node_subscriber->poll_query_until('postgres', $synced_query)
> -  or die "Timed out while waiting for subscriber to synchronize data";
> -
>  # Also wait for initial table sync to finish.
> -$node_subscriber->poll_query_until('postgres', $synced_query)
> -  or die "Timed out while waiting for subscriber to synchronize data";
> +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1');
>
> It seems to me without your patch there is an extra poll in the above
> test. If so, we can probably remove that in a separate patch?

Agreed.

>
> 2.
> +    # wait for the replication to catchup if required.
> +    if (defined($publisher))
> +    {
> + croak 'subscription name must be specified' unless defined($subname);
> + $publisher->wait_for_catchup($subname, 'replay');
> +    }
> +
> +    # then, wait for all table states to be ready.
> +    print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
> +    my $query = qq[SELECT count(1) = 0
> +     FROM pg_subscription_rel
> +     WHERE srsubstate NOT IN ('r', 's');];
> +    $self->poll_query_until($dbname, $query)
> + or croak "timed out waiting for subscriber to synchronize data";
>
> In the tests, I noticed that a few places did wait_for_catchup after
> the subscription check, and at other places, we did that check before
> as you have it here. Ideally, I think wait_for_catchup should be after
> confirming the initial sync is over as without initial sync, the
> publisher node won't be completely in sync with the subscriber.

What do you mean by the last sentence? I thought the order doesn't
matter here. Even if we do wait_for_catchup first then the
subscription check, we can make sure that the apply worker caught up
and table synchronization has been done, no?

>
> 3. In the code quoted in the previous point, why did you pass the
> second parameter as 'replay' when we have not used that in the tests
> otherwise?

It makes sure to use the (current) default value of $mode of
wait_for_catchup(). But probably it's not necessary, I've removed it.

I've attached an updated patch as well as a patch to remove duplicated
waits in 007_ddl.pl.

Regards,

-- 
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
From 35c7b74b66a179b14fc97a012309f3d3ee9a7e26 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 26 Jul 2022 16:37:26 +0900
Subject: [PATCH v2 1/2] Remove duplicated wait for subscription
 synchronization in 007_ddl.pl

---
 src/test/subscription/t/007_ddl.pl | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index cdd6b119ff..01df54229c 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -57,10 +57,6 @@ my $synced_query =
 $node_subscriber->poll_query_until('postgres', $synced_query)
   or die "Timed out while waiting for subscriber to synchronize data";
 
-# Also wait for initial table sync to finish.
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
 # Specifying non-existent publication along with add publication.
 ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
 	"ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2"
-- 
2.24.3 (Apple Git-128)

From 229f8fe990f56a6558311de5eb75e35275b50609 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 26 Jul 2022 09:43:17 +0900
Subject: [PATCH v2 2/2] Introduce wait_for_subscription_sync for TAP tests

---
 src/test/perl/PostgreSQL/Test/Cluster.pm      | 43 +++++++++++++++++
 src/test/subscription/t/001_rep_changes.pl    | 14 +-----
 src/test/subscription/t/002_types.pl          |  7 +--
 src/test/subscription/t/004_sync.pl           | 16 ++-----
 src/test/subscription/t/005_encoding.pl       |  7 +--
 src/test/subscription/t/006_rewrite.pl        |  7 +--
 src/test/subscription/t/007_ddl.pl            |  7 +--
 src/test/subscription/t/008_diff_schema.pl    | 10 +---
 src/test/subscription/t/010_truncate.pl       |  8 +---
 src/test/subscription/t/011_generated.pl      |  5 +-
 src/test/subscription/t/013_partition.pl      | 20 +++-----
 src/test/subscription/t/014_binary.pl         |  5 +-
 src/test/subscription/t/015_stream.pl         |  7 +--
 src/test/subscription/t/016_stream_subxact.pl |  7 +--
 src/test/subscription/t/017_stream_ddl.pl     |  7 +--
 .../t/018_stream_subxact_abort.pl             |  7 +--
 .../t/019_stream_subxact_ddl_abort.pl         |  7 +--
 src/test/subscription/t/021_twophase.pl       | 14 +-----
 .../subscription/t/023_twophase_stream.pl     |  8 +---
 src/test/subscription/t/024_add_drop_pub.pl   | 18 ++-----
 .../t/025_rep_changes_for_schema.pl           | 16 ++-----
 src/test/subscription/t/027_nosuperuser.pl    |  7 +--
 src/test/subscription/t/028_row_filter.pl     | 23 +++------
 src/test/subscription/t/029_on_error.pl       |  5 +-
 src/test/subscription/t/030_origin.pl         | 15 ++----
 src/test/subscription/t/031_column_list.pl    | 48 +++++++------------
 src/test/subscription/t/100_bugs.pl           | 10 +---
 27 files changed, 111 insertions(+), 237 deletions(-)

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index c8c7bc5045..afa0def72f 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2648,6 +2648,49 @@ sub wait_for_slot_catchup
 
 =pod
 
+=item $node->wait_for_subscription_sync($node_publisher, $subname, $dbname)
+
+Wait for all tables in pg_subscription_rel until their states become ready
+(syncdone or ready).
+
+If the publisher node is given, check also if the subscriber has caught up
+to what has been committed on the primary. This is useful to ensure that the
+initial data synchronization has completed after creating a new subscription.
+
+If there is no active replication connection from this peer, waits until
+poll_query_until timeout.
+
+This is not a test. It die()s on failure.
+
+=cut
+
+sub wait_for_subscription_sync
+{
+    my ($self, $publisher, $subname, $dbname) = @_;
+    my $name = $self->name;
+
+    $dbname = defined($dbname) ? $dbname : 'postgres';
+
+    # wait for the replication to catchup if required.
+    if (defined($publisher))
+    {
+	croak 'subscription name must be specified' unless defined($subname);
+	$publisher->wait_for_catchup($subname);
+    }
+
+    # then, wait for all table states to be ready.
+    print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
+    my $query =	qq[SELECT count(1) = 0
+     FROM pg_subscription_rel
+     WHERE srsubstate NOT IN ('r', 's');];
+    $self->poll_query_until($dbname, $query)
+	or croak "timed out waiting for subscriber to synchronize data";
+    print "done\n";
+    return;
+}
+
+=pod
+
 =item $node->wait_for_log(regexp, offset)
 
 Waits for the contents of the server log file, starting at the given offset, to
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index f53b3b7db0..be2acbdaf0 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -102,13 +102,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 my $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
@@ -237,13 +232,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
 );
 
-$node_publisher->wait_for_catchup('tap_sub_temp1');
-
 # Also wait for initial table sync to finish
-$synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_temp1');
 
 # Subscriber table will have no rows initially
 $result =
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
index 3f1f00f7c8..d6c6f49327 100644
--- a/src/test/subscription/t/002_types.pl
+++ b/src/test/subscription/t/002_types.pl
@@ -114,13 +114,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
 # Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Insert initial test data
 $node_publisher->safe_psql(
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index cf61fc1e0f..957e5e7cdf 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -39,13 +39,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 my $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
@@ -71,8 +66,7 @@ $node_subscriber->poll_query_until('postgres', $started_query)
 $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
 
 # wait for sync to finish this time
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # check that all data is synced
 $result =
@@ -107,8 +101,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # and wait for data sync to finish again
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # check that all data is synced
 $result =
@@ -133,8 +126,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
 
 # wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM tab_rep_next");
diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl
index 38a74a897f..c1a70635f9 100644
--- a/src/test/subscription/t/005_encoding.pl
+++ b/src/test/subscription/t/005_encoding.pl
@@ -32,13 +32,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
 );
 
-$node_publisher->wait_for_catchup('mysub');
-
 # Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
 
 $node_publisher->safe_psql('postgres',
 	q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl
index c924ff35f7..49d3b9f032 100644
--- a/src/test/subscription/t/006_rewrite.pl
+++ b/src/test/subscription/t/006_rewrite.pl
@@ -28,13 +28,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
 );
 
-$node_publisher->wait_for_catchup('mysub');
-
 # Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
 
 $node_publisher->safe_psql('postgres',
 	q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 01df54229c..0a5b4bbed8 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -49,13 +49,8 @@ ok( $stderr =~
 	  m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
 	"Create subscription throws warning for non-existent publication");
 
-$node_publisher->wait_for_catchup('mysub1');
-
 # Also wait for initial table sync to finish.
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1');
 
 # Specifying non-existent publication along with add publication.
 ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl
index 67b4026afa..934723aeed 100644
--- a/src/test/subscription/t/008_diff_schema.pl
+++ b/src/test/subscription/t/008_diff_schema.pl
@@ -38,13 +38,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 my $result =
   $node_subscriber->safe_psql('postgres',
@@ -105,8 +100,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
 $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
 
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # Add replica identity column.  (The serial is not necessary, but it's
 # a convenient way to get a default on the new column so that rows
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index d519249431..a6fe82a71f 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -67,10 +67,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # Wait for initial sync of all subscriptions
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # insert data to truncate
 
@@ -211,8 +208,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # wait for initial data sync
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # insert data to truncate
 
diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl
index e991a08032..3d96f6f30f 100644
--- a/src/test/subscription/t/011_generated.pl
+++ b/src/test/subscription/t/011_generated.pl
@@ -40,10 +40,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # Wait for initial sync of all subscriptions
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
 is( $result, qq(1|22
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 0dfbbabc3b..8b33e4e7ae 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -153,12 +153,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
 });
 
 # Wait for initial sync of all subscriptions
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber1->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
 
 # Tests for replication using leaf partition identity and schema
 
@@ -490,10 +486,8 @@ $node_subscriber2->safe_psql('postgres',
 	"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
 
 # Wait for initial sync of all subscriptions
-$node_subscriber1->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
 
 # check that data is synced correctly
 $result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
@@ -568,8 +562,7 @@ $node_subscriber2->safe_psql('postgres',
 
 # make sure the subscription on the second subscriber is synced, before
 # continuing
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->wait_for_subscription_sync;
 
 # Insert a change into the leaf partition, should be replicated through
 # the partition root (thanks to the FOR ALL TABLES partition).
@@ -824,8 +817,7 @@ $node_subscriber2->safe_psql(
 $node_subscriber2->safe_psql('postgres',
 	"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
 
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->wait_for_subscription_sync;
 
 # Make partition map cache
 $node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
index a1f03e7adc..8d8b35721f 100644
--- a/src/test/subscription/t/014_binary.pl
+++ b/src/test/subscription/t/014_binary.pl
@@ -46,10 +46,7 @@ $node_subscriber->safe_psql('postgres',
 	  . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
 
 # Ensure nodes are in sync with each other
-$node_publisher->wait_for_catchup('tsub');
-$node_subscriber->poll_query_until('postgres',
-	"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
-) or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
 
 # Insert some content and make sure it's replicated across
 $node_publisher->safe_psql(
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 6561b189de..97985ef079 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl
index f27f1694f2..6085b3f7ad 100644
--- a/src/test/subscription/t/016_stream_subxact.pl
+++ b/src/test/subscription/t/016_stream_subxact.pl
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl
index 0bce63b716..62844ceae2 100644
--- a/src/test/subscription/t/017_stream_ddl.pl
+++ b/src/test/subscription/t/017_stream_ddl.pl
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
index 7155442e76..10377eaeeb 100644
--- a/src/test/subscription/t/018_stream_subxact_abort.pl
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -40,13 +40,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
index dbd0fca4d1..4eb88d5fc5 100644
--- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
+++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index c3e9857f7c..c53f36d831 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -54,13 +54,7 @@ $node_subscriber->safe_psql(
 	WITH (two_phase = on)");
 
 # Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # Also wait for two-phase to be enabled
 my $twophase_query =
@@ -332,11 +326,7 @@ $node_subscriber->safe_psql(
 	WITH (two_phase=on, copy_data=false);");
 
 # Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname_copy);
-
-# Also wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
 
 # Also wait for two-phase to be enabled
 $node_subscriber->poll_query_until('postgres', $twophase_query)
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index d8475d25a4..60de9b18fe 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -55,14 +55,8 @@ $node_subscriber->safe_psql(
 	PUBLICATION tap_pub
 	WITH (streaming = on, two_phase = on)");
 
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname);
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # Also wait for two-phase to be enabled
 my $twophase_query =
diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index 246f8c9237..eaf47e66f1 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -37,13 +37,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # Wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Check the initial data of tab_1 is copied to subscriber
 my $result = $node_subscriber->safe_psql('postgres',
@@ -67,10 +61,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
 
 # Wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Check the initial data of tab_drop_refresh was copied to subscriber
 $result = $node_subscriber->safe_psql('postgres',
@@ -82,10 +73,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
 
 # Wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Check the initial data of tab_1 was copied to subscriber again
 $result = $node_subscriber->safe_psql('postgres',
diff --git a/src/test/subscription/t/025_rep_changes_for_schema.pl b/src/test/subscription/t/025_rep_changes_for_schema.pl
index 5ce275cf72..f30585289a 100644
--- a/src/test/subscription/t/025_rep_changes_for_schema.pl
+++ b/src/test/subscription/t/025_rep_changes_for_schema.pl
@@ -62,13 +62,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
 );
 
-$node_publisher->wait_for_catchup('tap_sub_schema');
-
 # Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_schema');
 
 # Check the schema table data is synced up
 my $result = $node_subscriber->safe_psql('postgres',
@@ -123,8 +118,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
 
 # Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
 
@@ -158,8 +152,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
 
 # Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
@@ -183,8 +176,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
 
 # Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index e8f3e4bba1..4e094ac756 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -153,13 +153,8 @@ SET SESSION AUTHORIZATION regress_admin;
 CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
 ));
 
-$node_publisher->wait_for_catchup('admin_sub');
-
 # Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub');
 
 # Verify that "regress_admin" can replicate into the tables
 #
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index b1fb2d7cae..0cef5db132 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -17,9 +17,6 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 my $appname           = 'tap_sub';
 
@@ -48,10 +45,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
 );
 
-$node_publisher->wait_for_catchup($appname);
-# wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# wait for table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # The subscription of the FOR ALL TABLES publication means there should be no
 # filtering on the tablesync COPY, so all expect all 5 will be present.
@@ -133,10 +128,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
 );
 
-$node_publisher->wait_for_catchup($appname);
-# wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# wait for table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # The subscription of the ALL TABLES IN SCHEMA publication means there should be
 # no filtering on the tablesync COPY, so expect all 5 will be present.
@@ -397,11 +390,8 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # Check expected replicated rows for tab_rowfilter_1
 # tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
@@ -622,8 +612,7 @@ $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
 
 # wait for table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
index 303e8ec3fc..05daa77c58 100644
--- a/src/test/subscription/t/029_on_error.pl
+++ b/src/test/subscription/t/029_on_error.pl
@@ -124,10 +124,7 @@ $node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
 $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
 
 # Wait for the data to replicate.
-$node_publisher->wait_for_catchup('sub');
-$node_subscriber->poll_query_until('postgres',
-	"SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
-);
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
 
 # Confirm that we have finished the table sync.
 my $result =
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index e9241d2996..2518bdd919 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -52,16 +52,8 @@ $node_A->safe_psql(
 	WITH (origin = none, copy_data = off)");
 
 # Wait for subscribers to finish initialization
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_A->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_B->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_A->wait_for_subscription_sync($node_B, $appname_A);
+$node_B->wait_for_subscription_sync($node_A, $appname_B1);
 
 is(1, 1, 'Bidirectional replication setup is complete');
 
@@ -128,8 +120,7 @@ $node_B->safe_psql(
 
 $node_C->wait_for_catchup($appname_B2);
 
-$node_B->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->wait_for_subscription_sync;
 
 # insert a record
 $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9fa6e0b35f..adae725f59 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -22,18 +22,6 @@ $node_subscriber->start;
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 my $offset            = 0;
 
-sub wait_for_subscription_sync
-{
-	my ($node) = @_;
-
-	# Also wait for initial table sync to finish
-	my $synced_query =
-	  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
-	$node->poll_query_until('postgres', $synced_query)
-	  or die "Timed out while waiting for subscriber to synchronize data";
-}
-
 # setup tables on both nodes
 
 # tab1: simple 1:1 replication
@@ -160,7 +148,7 @@ $node_subscriber->safe_psql(
 	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 # tab1: only (a,b) is replicated
 $result =
@@ -333,7 +321,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for the tablesync to complete, add a bit more data and then check
 # the results of the replication
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -385,7 +373,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->wait_for_catchup('sub1');
 
@@ -428,7 +416,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -465,7 +453,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -504,7 +492,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -621,7 +609,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -687,7 +675,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -758,7 +746,7 @@ $node_subscriber->safe_psql(
 	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -795,7 +783,7 @@ $node_subscriber->safe_psql(
 	TRUNCATE test_part_c;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -855,7 +843,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -898,7 +886,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -938,7 +926,7 @@ $node_subscriber->safe_psql(
 	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -985,7 +973,7 @@ $node_subscriber->safe_psql(
 	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -1034,7 +1022,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -1058,7 +1046,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -1102,7 +1090,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
@@ -1150,7 +1138,7 @@ $node_subscriber->safe_psql(
 	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
 	'postgres', qq(
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 11ba473715..ca629d4f9c 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -144,12 +144,7 @@ $node_twoways->safe_psql('d2',
 # We cannot rely solely on wait_for_catchup() here; it isn't sufficient
 # when tablesync workers might still be running. So in addition to that,
 # verify that tables are synced.
-# XXX maybe this should be integrated in wait_for_catchup() itself.
-$node_twoways->wait_for_catchup('testsub');
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_twoways->poll_query_until('d2', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2');
 
 is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
 	$rows * 2, "2x$rows rows in t");
@@ -281,8 +276,7 @@ $node_subscriber->safe_psql('postgres',
 $node_publisher->wait_for_catchup('tap_sub');
 
 # Also wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 is( $node_subscriber->safe_psql(
 		'postgres', "SELECT * FROM tab_replidentity_index"),
-- 
2.24.3 (Apple Git-128)

Reply via email to