On Fri, 6 Jan 2023 at 04:32, Tom Lane <t...@sss.pgh.pa.us> wrote:
>
> vignesh C <vignes...@gmail.com> writes:
> > On Thu, 5 Jan 2023 at 03:17, Tom Lane <t...@sss.pgh.pa.us> wrote:
> >> The bigger picture here though is that in examples such as the one
> >> you gave at the top of the thread, it's not very clear to me that
> >> there's *any* principled behavior.  If the connection between publisher
> >> and subscriber tables is only the relation name, fine ... but exactly
> >> which relation name applies?
>
> > The connection between publisher and subscriber table is based on
> > relation id, During the first change relid, relname and schema name
> > from publisher will be sent to the subscriber. Subscriber stores these
> > id, relname and schema name in the LogicalRepRelMap hash for which
> > relation id is the key. Subsequent data received in the subscriber
> > will use the relation id received from the publisher and apply the
> > changes in the subscriber.
>
> Hm.  I spent some time cleaning up this patch, and found that there's
> still a problem.  ISTM that the row with value "3" ought to end up
> in the subscriber's sch2.t1 table, but it does not: the attached
> test script fails with
>
> t/100_bugs.pl .. 6/?
> #   Failed test 'check data in subscriber sch2.t1 after schema rename'
> #   at t/100_bugs.pl line 361.
> #          got: ''
> #     expected: '3'
> # Looks like you failed 1 test of 9.
> t/100_bugs.pl .. Dubious, test returned 1 (wstat 256, 0x100)
> Failed 1/9 subtests
>
> What's up with that?

When the subscription is created, the subscriber will create a
subscription relation map of the corresponding relations from the
publication. The subscription relation map will only have sch1.t1
entry. As sch2.t1 was not present in the publisher when the
subscription was created, subscription will not have this entry in the
subscription relation map. So the insert operations performed on the
new table sch2.t1 will not be applied by the subscriber. We will have
to refresh the publication using 'ALTER SUBSCRIPTION ... REFRESH
PUBLICATION' to fetch missing table information from publisher. This
will start replication of tables that were added to the subscribed-to
publications since CREATE SUBSCRIPTION or the last invocation of
REFRESH PUBLICATION.
I have modified the test to include 'ALTER SUBSCRIPTION ... REFRESH
PUBLICATION' to get the new data. The test should expect 1 & 3 for
sch2.t1 as the record with value 1 was already inserted before rename.
The updated v6 patch has the changes for the same.

Regards,
Vignesh
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..876adab38e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 	Assert(RelationSyncCache != NULL);
 
+	/* We must update the cache entry for a relation after a relcache flush */
 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+	/*
+	 * Flush all cache entries after a pg_namespace change, in case it was a
+	 * schema rename affecting a relation being replicated.
+	 */
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_publication_cb,
+								  (Datum) 0);
+
+	/*
+	 * Flush all cache entries after any publication changes.  (We need no
+	 * callback entry for pg_publication, because publication_invalidation_cb
+	 * will take care of it.)
+	 */
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
@@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 /*
  * Publication relation/schema map syscache invalidation callback
  *
- * Called for invalidations on pg_publication, pg_publication_rel, and
- * pg_publication_namespace.
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
-	 * is no way to unregister the relcache invalidation callback.
+	 * is no way to unregister the invalidation callbacks.
 	 */
 	if (RelationSyncCache == NULL)
 		return;
 
 	/*
-	 * There is no way to find which entry in our cache the hash belongs to so
-	 * mark the whole cache as invalid.
+	 * We have no easy way to identify which cache entries this invalidation
+	 * event might have affected, so just mark them all invalid.
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 143caac792..ed310184ff 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1');
 pass('index predicates do not cause crash');
 
 # We'll re-use these nodes below, so drop their replication state.
-# We don't bother to drop the tables though.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+# Drop the tables too.
+$node_publisher->safe_psql('postgres', "DROP TABLE tab1");
 
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
@@ -307,6 +308,64 @@ is( $node_subscriber->safe_psql(
 	qq(-1|1),
 	"update works with REPLICA IDENTITY");
 
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+
+# Test schema invalidation by renaming the schema
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will cover t1 under both schema names
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_sch FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Check what happens to data inserted before and after schema rename
+$node_publisher->safe_psql(
+	'postgres',
+	"begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1,
+# but not the row inserted into the old sch1.t1 post-rename.
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is( $result, qq(1
+2), 'check data in subscriber sch1.t1 after schema rename');
+
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION tap_sub_sch REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Instead, that row should appear in sch2.t1.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is($result, qq(1
+3), 'check data in subscriber sch2.t1 after schema rename');
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 

Reply via email to