On Thu, 5 Jan 2023 at 03:17, Tom Lane <t...@sss.pgh.pa.us> wrote:
>
> vignesh C <vignes...@gmail.com> writes:
> > [ v3-0001-Fix-for-invalidating-logical-replication-relation.patch ]
>
> (btw, please don't send multiple patch versions with the same number,
> it's very confusing.)

Since it was just rebasing on top of HEAD, I did not change the
version, I will take care of this point in the later versions.

> I looked briefly at this patch.  I wonder why you wrote a whole new
> callback function instead of just using rel_sync_cache_publication_cb
> for this case too.

Yes we can use rel_sync_cache_publication_cb itself for the
invalidation of the relations, I have changed it.

> The bigger picture here though is that in examples such as the one
> you gave at the top of the thread, it's not very clear to me that
> there's *any* principled behavior.  If the connection between publisher
> and subscriber tables is only the relation name, fine ... but exactly
> which relation name applies?  If you've got a transaction that is both
> inserting some data and renaming the table, it's really debatable which
> insertions should be sent under which name(s).  So how much should we
> actually care about such cases?  Do we really want to force a cache flush
> any time somebody changes a (possibly unrelated) pg_namespace entry?
> We could be giving up significant performance and not accomplishing much
> except changing from one odd behavior to a different one.

The connection between publisher and subscriber table is based on
relation id, During the first change relid, relname and schema name
from publisher will be sent to the subscriber. Subscriber stores these
id, relname and schema name in the LogicalRepRelMap hash for which
relation id is the key. Subsequent data received in the subscriber
will use the relation id received from the publisher and apply the
changes in the subscriber.
The problem does not stop even after the transaction that renames the
schema is completed(Step3 in first mail). Even after the transaction
is completed i.e after Step 3 the inserts of sch1.t1 and sch2.t1 both
get replicated to sch1.t1 in the subscriber side. This happens because
the publisher id's of sch2.t1 and sch1.t1 are mapped to sch1.t1 in the
subscriber side and both inserts are successful.
Step4) In Publisher
postgres=# insert into sch2.t1 values(11);
INSERT 0 1
postgres=# insert into sch1.t1 values(12);
INSERT 0 1

Step5) In Subscriber
postgres=# select * from sch1.t1;
 c1
----
 11
 12
(2 rows)

During the sch1.t1 first insertion the relid, relname and schema name
from publisher will be sent to the subscriber, this entry will be
mapped to sch1.t1 in subscriber side and any insert from the publisher
will insert to sch1.t1.
After the rename of schema(relid will not be changed) since this entry
is not invalidated, even though we are inserting to sch2.t1 as the
relid is not changed, subscriber will continue to insert into sch1.t1
in subscriber.
During the first insert of new table sch1.t1, the relid, relname and
schema name from publisher will be sent to the subscriber, this entry
will be again mapped to sch1.t1 in the subscriber side.
Since both the entries sch1.t1 and sch2.t1 are mapped to sch1.t1 in
the subscriber side, both inserts will insert to the same table.
This issue will get fixed if we invalidate the relation and update the
relmap in the subscriber.
I did not like the behavior where any insert on sch1.t1 or sch2.t1
replicates the changes to sch1.t1 in the subscriber. I felt it might
be better to fix this issue. I agree that the invalidations are
costly. If you feel this is a very corner case then we can skip it.

Attached an updated patch.

Regards,
Vignesh
From 74211d016528699205163dab8ecc7fe04aec09b2 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Sat, 12 Mar 2022 13:04:55 +0530
Subject: [PATCH v4] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c |  3 +
 src/test/subscription/t/001_rep_changes.pl  | 67 +++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..f68c271cf2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1936,6 +1936,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_publication_cb,
+								  (Datum) 0);
 }
 
 /*
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 91aa068c95..3e91640c03 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -542,6 +542,73 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber1->wait_for_subscription_sync($node_publisher1, 'tap_sub_sch');
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber1->wait_for_subscription_sync($node_publisher1, 'tap_sub_sch');
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.34.1

Reply via email to