Hi,

I found a strange behavior when there is an insert after renaming the
schema. The test steps for the same are given below, Here after the
schema is renamed, the renamed schema table data should not be sent,
but the data was being sent. I felt the schema invalidation was not
called, attached a patch to handle the same. Thoughts?

step 1)
Create schema sch1;
Create table sch1.t1(c1 int);
CREATE PUBLICATION mypub1 FOR all tables;

Step 2)
CREATE SCHEMA sch1;
CREATE TABLE sch1.t1(c1 int);
CREATE SCHEMA sch2;
CREATE TABLE sch2.t1(c1 int);
CREATE TABLE t1(c1 int);
CREATE SUBSCRIPTION mysub1 CONNECTION 'host=localhost port=5432
dbname=postgres' PUBLICATION mypub1;

Step 3)
begin;
insert into sch1.t1 values(1);
alter schema sch1 rename to sch2;
create schema sch1;
create table sch1.t1(c1 int);
insert into sch1.t1 values(2);
insert into sch2.t1 values(3);
commit;

step 4)
select * from sch1.t1; # In subscriber
Got:
c1
----
  1
  2
  3
(3 rows)

Expected:
c1
----
  1
  2
(2 rows)

Regards,
Vignesh
From 66b96e213ee390f0f069b81b4a60d8afc91014c6 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Fri, 2 Jul 2021 10:42:37 +0530
Subject: [PATCH] Fix for invalidating logical replication relations when there
 is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 76 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index abd5217ab1..aca4d72e14 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -126,6 +126,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -960,6 +962,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1234,6 +1239,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation map syscache invalidation callback
  */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index dee5f5c30a..aeb4157044 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 32;
+use Test::More tests => 33;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -518,6 +518,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = get_new_node('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = get_new_node('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.25.1

Reply via email to