From 719073e0a26fff7cac5ce83896958987528ad7be Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 16 Apr 2025 23:36:00 -0400
Subject: [PATCH v17 3/3] Tests for filtering unpublished changes

---
 src/backend/replication/logical/reorderbuffer.c |  2 +-
 src/test/subscription/t/001_rep_changes.pl      | 75 +++++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index bdf63c9..98dca48 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -378,7 +378,7 @@ static void ReorderBufferMemoryResetcallback(void *arg);
  * hash table search for each record, especially when most changes are not
  * filterable.
  */
-#define CHANGES_THRESHOLD_FOR_FILTER 100
+#define CHANGES_THRESHOLD_FOR_FILTER 0
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 916fdb4..cfdc642 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -571,6 +571,81 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_replication_origin");
 is($result, qq(0), 'check replication origin was dropped on subscriber');
 
+$node_publisher->stop('fast');
+
+# Additional tests for filtering of unpublished changes
+# Bump up log verbosity to DEBUG1 for confirmation logs
+$node_publisher->append_conf('postgresql.conf', "log_min_messages = debug1");
+$node_publisher->start;
+
+# Create new tables on publisher and subscriber
+$node_publisher->safe_psql('postgres',
+							"CREATE TABLE pub_table (id int primary key, data text);
+							 CREATE TABLE unpub_table (id int primary key, data text);
+							 CREATE TABLE insert_only_table (id int primary key, data text);
+							 CREATE TABLE delete_only_table (id int primary key, data text);");
+
+$node_subscriber->safe_psql('postgres',
+							"CREATE TABLE pub_table (id int primary key, data text);
+							 CREATE TABLE unpub_table (id int primary key, data text);
+							 CREATE TABLE insert_only_table (id int primary key, data text);
+							 CREATE TABLE delete_only_table (id int primary key, data text);");
+
+# Setup logical replication publications
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub_all FOR TABLE pub_table;
+	 CREATE PUBLICATION pub_insert_only FOR TABLE insert_only_table WITH (publish = insert);
+	 CREATE PUBLICATION pub_delete_only FOR TABLE delete_only_table WITH (publish = delete);");
+
+# Setup logical replication subscriptions
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION sub_all CONNECTION '$publisher_connstr' PUBLICATION pub_all;
+ CREATE SUBSCRIPTION sub_insert_only CONNECTION '$publisher_connstr' PUBLICATION pub_insert_only;
+ CREATE SUBSCRIPTION sub_delete_only CONNECTION '$publisher_connstr' PUBLICATION pub_delete_only;");
+
+# Wait for initial sync
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub_all');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub_insert_only');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub_delete_only');
+
+# Insert into an unpublished table (should not be replicated)
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "INSERT INTO unpub_table VALUES (1, 'unpublished')");
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering INSERT/,
+	'unpublished INSERT is filtered');
+
+# Insert, update, and delete tests for publication with restricted tables
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "INSERT INTO insert_only_table VALUES (1, 'to be inserted')");
+$node_publisher->safe_psql('postgres', "UPDATE insert_only_table SET data = 'updated' WHERE id = 1");
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering UPDATE/,
+	'unpublished UPDATE is filtered');
+
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "DELETE FROM insert_only_table WHERE id = 1");
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering DELETE/,
+	'unpublished DELETE is filtered');
+
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "INSERT INTO delete_only_table VALUES (1, 'to be deleted')");
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering INSERT/,
+	'unpublished INSERT is filtered');
+
+#cleanup
+$node_subscriber->safe_psql('postgres',
+					"DROP SUBSCRIPTION sub_all;
+					 DROP SUBSCRIPTION sub_insert_only;
+					 DROP SUBSCRIPTION sub_delete_only;");
+
+$node_publisher->safe_psql('postgres',
+					"DROP PUBLICATION pub_all;
+					 DROP PUBLICATION pub_insert_only;
+					 DROP PUBLICATION pub_delete_only;");
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
1.8.3.1

