From 94f3bf64e9e06a9a32ce99dc795c06465ecdeb44 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 26 Jun 2025 11:11:48 +0530
Subject: [PATCH v1] Support tables via pg_createsubscriber

This patch adds support for specifying tables to be included in logical
replication publications via pg_createsubscriber. Users can now pass multiple
'--database' and '--table' options to define which tables should be published
and subscribed for each database.

Features:
1. Supports per-database table mapping using multiple '--database'/'--table'
pairs.
2. Allows optional column lists and row filters.
3. If '--table' is omitted for a database, a 'FOR ALL TABLES' publication is
created.
4. Adds TAP tests to validate combinations of database and table arguments.

This improves fine-grained control over logical replication setup and aligns
pg_createsubscriber CLI design with other tools like vacuumdb and pg_restore.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  12 +
 src/bin/pg_basebackup/pg_createsubscriber.c   | 230 +++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              |  83 +++++++
 3 files changed, 322 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..f22a50b2c43 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -125,6 +125,18 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-f <replaceable class="parameter">table</replaceable></option></term>
+     <term><option>--table=<replaceable class="parameter">table</replaceable></option></term>
+     <listitem>
+      <para>
+       Adds a table to be included in the publication for the most recently
+       specified database. Can be repeated multiple times. The syntax
+       supports optional column lists and WHERE clauses.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
      <term><option>--pgdata=<replaceable class="parameter">directory</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 025b893a41e..e8874e13f99 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -31,6 +31,46 @@
 #define	DEFAULT_SUB_PORT	"50432"
 #define	OBJECTTYPE_PUBLICATIONS  0x0001
 
+static char *
+pg_strcasestr(const char *haystack, const char *needle)
+{
+	size_t		needlelen;
+
+	if (!haystack || !needle)
+		return NULL;
+
+	needlelen = strlen(needle);
+
+	for (; *haystack; haystack++)
+	{
+		if (pg_strncasecmp(haystack, needle, needlelen) == 0)
+			return (char *) haystack;
+	}
+
+	return NULL;
+}
+
+typedef struct TableSpec
+{
+	char	   *spec;
+	char	   *schema_name;
+	char	   *table_name;
+	char	   *column_list_raw;
+	char	   *where_clause_raw;
+	struct TableSpec *next;
+}			TableSpec;
+
+typedef struct TableListPerDB
+{
+	char	   *dbname;
+	TableSpec  *tables;
+	struct TableListPerDB *next;
+}			TableListPerDB;
+
+static TableListPerDB * dblist_head = NULL;
+static TableListPerDB * dblist_tail = NULL;
+static TableListPerDB * dblist_cur = NULL;
+
 /* Command-line options */
 struct CreateSubscriberOptions
 {
@@ -61,6 +101,7 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	TableSpec  *tables;			/* list of tables to be subscribed */
 };
 
 /*
@@ -249,6 +290,7 @@ usage(void)
 	printf(_("  -a, --all                       create subscriptions for all databases except template\n"
 			 "                                  databases and databases that don't allow connections\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
+	printf(_("  -f, --table                     table to subscribe to; can be specified multiple times\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
 	printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
@@ -505,6 +547,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		else
 			dbinfo[i].subname = NULL;
 		/* Other fields will be filled later */
+		dbinfo[i].tables = NULL;
 
 		pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
@@ -525,6 +568,20 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		i++;
 	}
 
+	for (int j = 0; j < num_dbs; j++)
+	{
+		const char *dbname = dbinfo[j].dbname;
+
+		for (TableListPerDB * cur = dblist_head; cur != NULL; cur = cur->next)
+		{
+			if (strcmp(cur->dbname, dbname) == 0)
+			{
+				dbinfo[j].tables = cur->tables;
+				break;
+			}
+		}
+	}
+
 	return dbinfo;
 }
 
@@ -1645,11 +1702,74 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	pg_log_info("creating publication \"%s\" in database \"%s\"",
 				dbinfo->pubname, dbinfo->dbname);
 
-	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
-					  ipubname_esc);
+	if (dbinfo->tables == NULL)
+		appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc);
+	else
+	{
+		bool		first = true;
+		TableSpec  *tbl = dbinfo->tables;
+
+		appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR TABLE ", ipubname_esc);
+		while (tbl)
+		{
+			char	   *escaped_schema = NULL;
+			char	   *escaped_table = NULL;
+
+			if (!tbl->table_name || strlen(tbl->table_name) == 0)
+				pg_fatal("table name cannot be null");
+
+			if (tbl->schema_name)
+				escaped_schema = PQescapeIdentifier(conn, tbl->schema_name, strlen(tbl->schema_name));
+			escaped_table = PQescapeIdentifier(conn, tbl->table_name, strlen(tbl->table_name));
+
+			appendPQExpBuffer(str, "%s", first ? "" : ", ");
+
+			if (escaped_schema)
+				appendPQExpBuffer(str, "%s.", escaped_schema);
+			appendPQExpBuffer(str, "%s", escaped_table);
+
+			if (tbl->column_list_raw && strlen(tbl->column_list_raw) > 0)
+				appendPQExpBuffer(str, " (%s)", tbl->column_list_raw);
+
+			if (tbl->where_clause_raw && strlen(tbl->where_clause_raw) > 0)
+				appendPQExpBuffer(str, " WHERE %s", tbl->where_clause_raw);
+
+			first = false;
+			tbl = tbl->next;
+
+			if (escaped_schema)
+				PQfreemem(escaped_schema);
+
+			if (escaped_table)
+				PQfreemem(escaped_table);
+		}
+	}
 
 	pg_log_debug("command is: %s", str->data);
 
+	if (dry_run)
+	{
+		res = PQexec(conn, "BEGIN");
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not begin transaction: %s", PQerrorMessage(conn));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+
+		res = PQexec(conn, "ROLLBACK");
+		PQclear(res);
+	}
+
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
@@ -2022,6 +2142,7 @@ main(int argc, char **argv)
 	{
 		{"all", no_argument, NULL, 'a'},
 		{"database", required_argument, NULL, 'd'},
+		{"table", required_argument, NULL, 'f'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
 		{"subscriber-port", required_argument, NULL, 'p'},
@@ -2109,7 +2230,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
+	while ((c = getopt_long(argc, argv, "ad:f:D:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2118,6 +2239,7 @@ main(int argc, char **argv)
 				opt.all_dbs = true;
 				break;
 			case 'd':
+				TableListPerDB * newdb;
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
 					simple_string_list_append(&opt.database_names, optarg);
@@ -2125,6 +2247,108 @@ main(int argc, char **argv)
 				}
 				else
 					pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
+
+				newdb = pg_malloc0(sizeof(TableListPerDB));
+				newdb->dbname = pg_strdup(optarg);
+				newdb->tables = NULL;
+				newdb->next = NULL;
+				if (dblist_tail)
+					dblist_tail->next = newdb;
+				else
+					dblist_head = newdb;
+
+				dblist_tail = newdb;
+				dblist_cur = newdb;
+
+				break;
+			case 'f':
+				TableSpec * ts;
+				char	   *full_table_spec;
+				char	   *temp_ptr;
+				char	   *where_start = NULL;
+				char	   *col_list_start = NULL;
+				char	   *col_list_end = NULL;
+				char	   *table_part_end = NULL;
+
+				if (dblist_cur == NULL)
+					pg_fatal("option --table must follow a --database");
+
+				if (strchr(optarg, ';') || strstr(optarg, "--") || strstr(optarg, "/*"))
+					pg_fatal("invalid SQL control characters in --table argument: \"%s\"", optarg);
+
+				ts = pg_malloc0(sizeof(TableSpec));
+				full_table_spec = pg_strdup(optarg);
+
+				where_start = pg_strcasestr(full_table_spec, " WHERE ");
+				if (where_start)
+				{
+					*where_start = '\0';
+					where_start += strlen(" WHERE ");
+					while (*where_start == ' ')
+						where_start++;
+					ts->where_clause_raw = pg_strdup(where_start);
+				}
+
+				col_list_start = strchr(full_table_spec, '(');
+				if (col_list_start)
+				{
+					col_list_end = strrchr(full_table_spec, ')');
+					if (!col_list_end || col_list_end < col_list_start)
+						pg_fatal("malformed column list in --table argument: \"%s\"", optarg);
+
+					*col_list_start = '\0';
+					*col_list_end = '\0';
+					ts->column_list_raw = pg_strdup(col_list_start + 1);
+					table_part_end = col_list_start;
+				}
+				else
+					table_part_end = full_table_spec + strlen(full_table_spec);
+
+				temp_ptr = strrchr(full_table_spec, '.');
+				if (temp_ptr && temp_ptr < table_part_end)
+				{
+					*temp_ptr = '\0';
+					ts->schema_name = pg_strdup(full_table_spec);
+					ts->table_name = pg_strdup(temp_ptr + 1);
+				}
+				else
+				{
+					ts->schema_name = pg_strdup("public");
+					ts->table_name = pg_strdup(full_table_spec);
+				}
+
+				if (ts->table_name)
+				{
+					size_t		len = strlen(ts->table_name);
+
+					while (len > 0 && isspace((unsigned char) ts->table_name[len - 1]))
+						ts->table_name[--len] = '\0';
+				}
+
+				if (ts->schema_name)
+				{
+					size_t		len = strlen(ts->schema_name);
+
+					while (len > 0 && isspace((unsigned char) ts->schema_name[len - 1]))
+						ts->schema_name[--len] = '\0';
+				}
+
+				if (!ts->table_name || strlen(ts->table_name) == 0)
+					pg_fatal("table name cannot be empty in --table argument: \"%s\"", optarg);
+
+				ts->next = NULL;
+
+				if (dblist_cur->tables == NULL)
+					dblist_cur->tables = ts;
+				else
+				{
+					TableSpec  *tail = dblist_cur->tables;
+
+					while (tail->next)
+						tail = tail->next;
+					tail->next = ts;
+				}
+				pg_free(full_table_spec);
 				break;
 			case 'D':
 				subscriber_dir = pg_strdup(optarg);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..3bd49630868 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,92 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Drop existing publications on database db1.
+$node_p->safe_psql(
+	$db1, qq(
+		DROP PUBLICATION test_pub1;
+		DROP PUBLICATION test_pub2;
+		DROP PUBLICATION pub1;
+));
+
+# Drop existing publications on database db2.
+$node_p->safe_psql($db2, "DROP PUBLICATION pub2");
+
+# Test: Table-level publication creation
+$node_p->safe_psql($db1, "CREATE TABLE public.t1 (id int, val text)");
+$node_p->safe_psql($db1, "CREATE TABLE public.t2 (id int, val text)");
+$node_p->safe_psql($db2,
+	"CREATE TABLE public.t3 (id int, val text, extra int)");
+
+# Initialize node_s2 as a fresh standby of node_p for table-level
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber with table-level options
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--table' => 'public.t1 (id)',
+		'--table' => 'public.t2 (val)',
+		'--database' => $db2,
+		'--table' => 'public.t3 (id, extra)',
+	],
+	'pg_createsubscriber runs with table-level publication (existing nodes)');
+
+# Get the publication name created by pg_createsubscriber for db1
+my $pubname1 = $node_p->safe_psql(
+	$db1, qq(
+    SELECT pubname FROM pg_publication
+    WHERE pubname LIKE 'pg_createsubscriber_%'
+    ORDER BY pubname LIMIT 1
+));
+
+# Check publication tables for db1
+my $actual1 = $node_p->safe_psql(
+	$db1, qq(
+	SELECT pubname || '|public|' || tablename
+	FROM pg_publication_tables
+	WHERE pubname = '$pubname1'
+	ORDER BY tablename
+));
+is($actual1, "$pubname1|public|t1\n$pubname1|public|t2",
+	'single publication for both tables created successfully on database db1'
+);
+
+# Get the publication name created by pg_createsubscriber for db2
+my $pubname2 = $node_p->safe_psql(
+	$db2, qq(
+	SELECT pubname FROM pg_publication
+	WHERE pubname LIKE 'pg_createsubscriber_%'
+	ORDER BY pubname LIMIT 1
+));
+
+# Check publication tables for db2
+my $actual2 = $node_p->safe_psql(
+	$db2, qq(
+	SELECT pubname || '|public|' || tablename
+	FROM pg_publication_tables
+	WHERE pubname = '$pubname2'
+	ORDER BY tablename
+));
+is($actual2, "$pubname2|public|t3",
+	'single publication for t3 created successfully on database db2');
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.41.0.windows.3

