On Mon, 21 Oct 2024 at 14:36, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Oct 7, 2024 at 11:05 AM vignesh C <vignes...@gmail.com> wrote:
> >
> > The tests demonstrate a significant performance improvement when using
> > the parallel streaming option, insert shows 40-48 %improvement, delete
> > shows 34-39 %improvement, update shows 26-30 %improvement. In the case
> > of rollback the improvement is between 12-44%, the improvement
> > slightly reduces with larger amounts of data being rolled back in this
> > case. If there's a significant amount of data to roll back, the
> > performance of streaming in parallel may be comparable to or slightly
> > lower in some instances. However, this is acceptable since commit
> > operations are generally more frequent than rollback operations.
> >
> > One key point to consider is that the lock on transaction objects will
> > be held for a longer duration when using streaming in parallel. This
> > occurs because the parallel apply worker initiates the transaction as
> > soon as streaming begins, maintaining the lock until the transaction
> > is fully completed. As a result, for long-running transactions, this
> > extended lock can hinder concurrent access that requires a lock.
> >
>
> The longer-running transactions will anyway have a risk of deadlocks
> or longer waits if there are concurrent operations on the subscribers.
> However, with parallel apply, there is a risk of deadlock among the
> leader and parallel workers when the schema in publisher and
> subscriber is different. Say the subscriber has a unique constraint
> that the publisher doesn't have. See the comments in this regard atop
> applyparallelworker.c in the "Locking Considerations" section. Having
> said that, the apply workers will detect deadlock in such cases and
> will retry to apply the errored-out transaction. So, there is a
> self-healing in-built mechanism and in such cases, we anyway have a
> risk of UNIQUE_KEY conflict ERRORs which in most cases would require
> manual intervention.
>
> > Since there is a significant percentage improvement, we should make
> > the default subscription streaming option parallel.
> >
>
> This makes sense to me. I think it would be better to add a Note or
> Warning in the docs for the risk of deadlock when the schema of
> publisher and subscriber is not the same even though such cases should
> be less.

Yes this can happen like scenarios below(with deadlock_timeout = 10ms):
Publisher:
CREATE TABLE t1(c1 int);
create publication pub1 for table t1;

Subscriber has an addition index on the table:
CREATE TABLE t1(c1 int);
CREATE UNIQUE INDEX idx1 on t1(c1);
Create subscription ...;

Publisher:
Session1:
Begin;
INSERT INTO t1 SELECT i FROM generate_series(1, 5000) s(i);

Session2:
-- Insert the record that is already inserted in session1
INSERT INTO t1 value(1);

Session1:
Commit;

In this case a deadlock will occur.

Attached v3 version patch has a caution added for the same.

Regards,
Vignesh
From a56f565689b8b18573acd873a097a213af6c6722 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v3] Make default value for susbcription streaming option to
 parallel.

Currently default value of streaming option is set to false. All
transactions are fully decoded on the publisher before being sent
to the subscriber. This approach can leads reduced performance,
particularly under heavy load.

Changing default streaming option to parallel, by doing this,
incoming changes will be directly applied by one of the available
parallel apply workers. This method significantly improves the
performance of commit operations.
---
 doc/src/sgml/ref/create_subscription.sgml  | 27 ++++++++++++++--------
 src/backend/commands/subscriptioncmds.c    |  2 +-
 src/bin/pg_dump/pg_dump.c                  |  2 ++
 src/bin/pg_dump/t/002_pg_dump.pl           | 10 ++++----
 src/test/regress/expected/subscription.out | 24 +++++++++----------
 5 files changed, 37 insertions(+), 28 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 8a3096e62b..3eab06bd2d 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,11 +271,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         <listitem>
          <para>
           Specifies whether to enable streaming of in-progress transactions
-          for this subscription.  The default value is <literal>off</literal>,
-          meaning all transactions are fully decoded on the publisher and only
-          then sent to the subscriber as a whole.
+          for this subscription.  The default value is <literal>parallel</literal>,
+          meaning incoming changes are directly applied via one of the parallel
+          apply workers, if available. If no parallel apply worker is free to
+          handle streaming transactions then the changes are written to
+          temporary files and applied after the transaction is committed. Note
+          that if an error happens in a parallel apply worker, the finish LSN
+          of the remote transaction might not be reported in the server log.
          </para>
 
+         <caution>
+          <para>
+           There is a risk of deadlock when the schemas of the publisher and
+           subscriber differ, although such cases are rare. The apply worker
+           is equipped to automatically retry these transactions.
+          </para>
+         </caution>
+
          <para>
           If set to <literal>on</literal>, the incoming changes are written to
           temporary files and then applied only after the transaction is
@@ -283,13 +295,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
 
          <para>
-          If set to <literal>parallel</literal>, incoming changes are directly
-          applied via one of the parallel apply workers, if available. If no
-          parallel apply worker is free to handle streaming transactions then
-          the changes are written to temporary files and applied after the
-          transaction is committed. Note that if an error happens in a
-          parallel apply worker, the finish LSN of the remote transaction
-          might not be reported in the server log.
+          If set to <literal>off</literal>, all transactions are fully decoded
+          on the publisher and only then sent to the subscriber as a whole.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	if (IsSet(supported_opts, SUBOPT_BINARY))
 		opts->binary = false;
 	if (IsSet(supported_opts, SUBOPT_STREAMING))
-		opts->streaming = LOGICALREP_STREAM_OFF;
+		opts->streaming = LOGICALREP_STREAM_PARALLEL;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
 	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1d79865058..19e3d24326 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5248,6 +5248,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 		appendPQExpBufferStr(query, ", streaming = on");
 	else if (strcmp(subinfo->substream, "p") == 0)
 		appendPQExpBufferStr(query, ", streaming = parallel");
+	else
+		appendPQExpBufferStr(query, ", streaming = off");
 
 	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
 		appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 91a4c63744..213904440f 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3002,7 +3002,7 @@ my %tests = (
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
 						 WITH (connect = false);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+			\QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 	},
@@ -3011,9 +3011,9 @@ my %tests = (
 		create_order => 50,
 		create_sql => 'CREATE SUBSCRIPTION sub2
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
-						 WITH (connect = false, origin = none);',
+						 WITH (connect = false, origin = none, streaming = off);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+			\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 	},
@@ -3022,9 +3022,9 @@ my %tests = (
 		create_order => 50,
 		create_sql => 'CREATE SUBSCRIPTION sub3
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
-						 WITH (connect = false, origin = any);',
+						 WITH (connect = false, origin = any, streaming = on);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 	},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                  List of subscriptions
        Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
                                                                                                                  List of subscriptions
        Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR:  invalid connection string syntax: missing "=" after "foobar" in connecti
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
                                                                                                                      List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
                                                                                                                      List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR:  invalid WAL location (LSN): 0/0
                                                                                                                      List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -226,7 +226,7 @@ HINT:  Available values: local, remote_write, remote_apply, on, off.
                                                                                                                        List of subscriptions
         Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 ---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
-- 
2.34.1

Reply via email to