On 2021-Sep-06, Rahila Syed wrote:

> > > ... ugh.  Since CASCADE is already defined to be a
> > > potentially-data-loss operation, then that may be acceptable
> > > behavior.  For sure the default RESTRICT behavior shouldn't do it,
> > > though.
> >
> > That makes sense to me.
>
> However, the default (RESTRICT) behaviour of DROP TABLE allows
> removing the table from the publication. I have implemented the
> removal of table from publication on drop column (RESTRICT)  on the
> same lines.

But dropping the table is quite a different action from dropping a
column, isn't it?  If you drop a table, it seems perfectly reasonable
that it has to be removed from the publication -- essentially, when the
user drops a table, she is saying "I don't care about this table
anymore".  However, if you drop just one column, that doesn't
necessarily mean that the user wants to stop publishing the whole table.
Removing the table from the publication in ALTER TABLE DROP COLUMN seems
like an overreaction.  (Except perhaps in the special case were the
column being dropped is the only one that was being published.)

So let's discuss what should happen.  If you drop a column, and the
column is filtered out, then it seems to me that the publication should
continue to have the table, and it should continue to filter out the
other columns that were being filtered out, regardless of CASCADE/RESTRICT.
However, if the column is *included* in the publication, and you drop
it, ISTM there are two cases:

1. If it's DROP CASCADE, then the list of columns to replicate should
continue to have all columns it previously had, so just remove the
column that is being dropped.

2. If it's DROP RESTRICT, then an error should be raised so that the
user can make a concious decision to remove the column from the filter
before dropping the column.

> Did you give any thoughts to my earlier suggestion related to syntax [1]?
> 
> [1] 
> https://www.postgresql.org/message-id/CAA4eK1J9b_0_PMnJ2jq9E55bcbmTKdUmy6jPnkf1Zwy2jxah_g%40mail.gmail.com

This is a great followup idea, after the current feature is committed.
There are a few things that have been reported in review comments; let's
get those addressed before adding more features on top.

I pushed the clerical part of this -- namely the addition of
PublicationTable node and PublicationRelInfo struct.  I attach the part
of your v4 patch that I didn't include.  It contains a couple of small
corrections, but I didn't do anything invasive (such as pgindent)
because that would perhaps cause you too much merge pain.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
>From 6a9e266cc8ce10f087a906bae2be7f6682ba19ac Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 6 Sep 2021 10:34:29 -0300
Subject: [PATCH v5] Add column filtering to logical replication

Add capability to specifiy column names while linking
the table to a publication, at the time of CREATE or ALTER
publication. This will allow replicating only the specified
columns. Other columns, if any, on the subscriber will be populated
locally or NULL will be inserted if no value is supplied for the column
by the upstream during INSERT.
This facilitates replication to a table on subscriber
containing only the subscribed/filtered columns.
If no filter is specified, all the columns are replicated.
REPLICA IDENTITY columns are always replicated.
Thus, prohibit adding relation to publication, if column filters
do not contain REPLICA IDENTITY.
Add a tap test for the same in src/test/subscription.
---
 src/backend/access/common/relation.c        | 21 +++++
 src/backend/catalog/pg_publication.c        | 56 +++++++++++-
 src/backend/commands/publicationcmds.c      |  8 +-
 src/backend/nodes/copyfuncs.c               |  1 +
 src/backend/nodes/equalfuncs.c              |  1 +
 src/backend/parser/gram.y                   |  3 +-
 src/backend/replication/logical/proto.c     | 90 ++++++++++++++-----
 src/backend/replication/logical/tablesync.c | 97 +++++++++++++++++++--
 src/backend/replication/pgoutput/pgoutput.c | 74 +++++++++++++---
 src/include/catalog/pg_publication.h        |  1 +
 src/include/catalog/pg_publication_rel.h    |  4 +
 src/include/nodes/parsenodes.h              |  1 +
 src/include/replication/logicalproto.h      |  6 +-
 src/include/utils/rel.h                     |  1 +
 14 files changed, 317 insertions(+), 47 deletions(-)

diff --git a/src/backend/access/common/relation.c b/src/backend/access/common/relation.c
index 632d13c1ea..59c1136f2e 100644
--- a/src/backend/access/common/relation.c
+++ b/src/backend/access/common/relation.c
@@ -21,12 +21,14 @@
 #include "postgres.h"
 
 #include "access/relation.h"
+#include "access/sysattr.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/lmgr.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
 
@@ -215,3 +217,22 @@ relation_close(Relation relation, LOCKMODE lockmode)
 	if (lockmode != NoLock)
 		UnlockRelationId(&relid, lockmode);
 }
+
+/*
+ * Return a bitmapset of attributes given the list of column names
+ */
+Bitmapset*
+get_table_columnset(Oid relid, List *columns, Bitmapset *att_map)
+{
+	ListCell *cell;
+	foreach(cell, columns)
+	{
+		const char *attname = lfirst(cell);
+		int attnum = get_attnum(relid, attname);
+
+		if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, att_map))
+			att_map = bms_add_member(att_map,
+					attnum - FirstLowInvalidHeapAttributeNumber);
+	}
+	return att_map;
+}
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index d6fddd6efe..a78af8f807 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -47,8 +47,12 @@
  * error if not.
  */
 static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, List *targetcols)
 {
+	bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	Oid			relid = RelationGetRelid(targetrel);
+	Bitmapset *idattrs;
+
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -73,6 +77,35 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("Temporary and unlogged relations cannot be replicated.")));
+
+	/*
+	 * Cannot specify column filter when REPLICA IDENTITY IS FULL
+	 * or if column filter does not contain REPLICA IDENITY columns
+	 */
+	if (targetcols != NIL)
+	{
+		if (replidentfull)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot add relation \"%s\" to publication",
+							RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter with REPLICA IDENTITY FULL")));
+		else
+		{
+			Bitmapset *filtermap = NULL;
+			idattrs = RelationGetIndexAttrBitmap(targetrel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+			filtermap = get_table_columnset(relid, targetcols, filtermap);
+			if (!bms_is_subset(idattrs, filtermap))
+			{
+					ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("cannot add relation \"%s\" to publication",
+								RelationGetRelationName(targetrel)),
+						errdetail("Column filter must include REPLICA IDENTITY columns")));
+			}
+		}
+	}
 }
 
 /*
@@ -153,6 +186,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ListCell *lc;
+	List *target_cols = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -175,7 +210,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	foreach(lc, targetrel->columns)
+	{
+		char *colname;
+
+		colname = strVal(lfirst(lc));
+		target_cols = lappend(target_cols, colname);
+	}
+	check_publication_add_relation(targetrel->relation, target_cols);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -188,6 +230,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	values[Anum_pg_publication_rel_prattrs - 1] =
+		PointerGetDatum(strlist_to_textarray(target_cols));
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -196,7 +240,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	heap_freetuple(tup);
 
 	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
+	foreach(lc, target_cols)
+	{
+		int  attnum;
+		attnum = get_attnum(relid, lfirst(lc));
 
+		/* Add dependency on the column */
+		ObjectAddressSubSet(referenced, RelationRelationId, relid, attnum);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+	}
 	/* Add dependency on the publication */
 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 179a0ef982..3c71bfb1f2 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -413,7 +413,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-
+				 /* This is not needed to delete a table */
+				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
 		}
@@ -548,6 +549,8 @@ OpenTableList(List *tables)
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->columns = NIL;
+
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
@@ -581,8 +584,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
+
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				pub_rel->columns = NIL;
+
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index e308de170e..857129a371 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4945,6 +4945,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 99440b40be..b0f37b2ceb 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -3118,6 +3118,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6a0f46505c..e5cd7ea74f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9637,10 +9637,11 @@ publication_table_list:
 				{ $$ = lappend($1, $3); }
 		;
 
-publication_table: relation_expr
+publication_table: relation_expr opt_column_list
 		{
 			PublicationTable *n = makeNode(PublicationTable);
 			n->relation = $1;
+			n->columns = $2;
 			$$ = (Node *) n;
 		}
 	;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..f9e5179860 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,9 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+					HeapTuple tuple, bool binary, Bitmapset *att_map);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, att_map);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map)
 {
 	char	   *relname;
 
@@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, att_map);
 }
 
 /*
@@ -749,20 +749,37 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary,
+							Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
 	bool		isnull[MaxTupleAttributeNumber];
 	int			i;
 	uint16		nliveatts = 0;
+	Bitmapset  *idattrs = NULL;
+	bool		replidentfull;
+	Form_pg_attribute att;
 
 	desc = RelationGetDescr(rel);
 
+	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	if (!replidentfull)
+		idattrs = RelationGetIdentityKeyBitmap(rel);
+
 	for (i = 0; i < desc->natts; i++)
 	{
+		att = TupleDescAttr(desc, i);
 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
 			continue;
+		/*
+		 * Do not increment count of attributes if not a part of column filters
+		 * except for replica identity columns or if replica identity is full.
+		 */
+		if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map)
+			&& !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs)
+						&& !replidentfull)
+			continue;
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
@@ -800,6 +817,16 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 			continue;
 		}
 
+		/*
+		 * Do not send attribute data if it is not a part of column filters,
+		 * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is
+		 * full, send the data.
+		 */
+		if (att_map != NULL && !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_map)
+			&& !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs)
+						&& !replidentfull)
+			continue;
+
 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
 		if (!HeapTupleIsValid(typtup))
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
@@ -904,7 +931,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +941,34 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
-	/* send number of live attributes */
-	for (i = 0; i < desc->natts; i++)
-	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
-			continue;
-		nliveatts++;
-	}
-	pq_sendint16(out, nliveatts);
-
 	/* fetch bitmap of REPLICATION IDENTITY attributes */
 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
 	if (!replidentfull)
 		idattrs = RelationGetIdentityKeyBitmap(rel);
 
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
+		if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+		{
+			nliveatts++;
+			continue;
+		}
+		/* Skip sending if not a part of column filter */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						att_map))
+			continue;
+		nliveatts++;
+	}
+	pq_sendint16(out, nliveatts);
+
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -937,6 +978,13 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/* Exlude filtered columns, REPLICA IDENTITY COLUMNS CAN'T BE EXCLUDED */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						att_map) && !bms_is_member(att->attnum
+					- FirstLowInvalidHeapAttributeNumber, idattrs)
+						&& !replidentfull)
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..9bd834914b 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -695,19 +696,27 @@ fetch_remote_table_info(char *nspname, char *relname,
 						LogicalRepRelation *lrel)
 {
 	WalRcvExecResult *res;
+	WalRcvExecResult *res_pub;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
+	TupleTableSlot *slot_pub;
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			pubRow[] = {TEXTARRAYOID};
 	bool		isnull;
-	int			natt;
+	int			natt,i;
+	Datum *elems;
+	int nelems;
+	List *pub_columns = NIL;
+	ListCell *lc;
+	bool	am_partition = false;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,6 +746,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
@@ -774,11 +784,79 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+
+	/*
+	 * Now, fetch the values of publications' column filters
+	 * For a partition, use pg_inherit to find the parent,
+	 * as the pg_publication_rel contains only the topmost parent
+	 * table entry in case the table is partitioned.
+	 * Run a recursive query to iterate through all the parents
+	 * of the partition and retreive the record for the parent
+	 * that exists in pg_publication_rel.
+	 */
+	resetStringInfo(&cmd);
+	if (!am_partition)
+		appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel"
+					" WHERE prrelid = %u", lrel->remoteid);
+	else
+		appendStringInfo(&cmd, "WITH RECURSIVE t(inhparent) AS ( SELECT inhparent from pg_inherits where inhrelid = %u"
+					" UNION SELECT pg.inhparent from pg_inherits pg, t where inhrelid = t.inhparent)"
+					" SELECT prattrs from pg_publication_rel WHERE prrelid IN (SELECT inhparent from t)", lrel->remoteid);
+
+	res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+					  lengthof(pubRow), pubRow);
+
+	if (res_pub->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res_pub->err)));
+	slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple);
+
+	while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub))
+	{
+		deconstruct_array(DatumGetArrayTypePCopy(slot_getattr(slot_pub, 1, &isnull)),
+				TEXTOID, -1, false, 'i',
+					&elems, NULL, &nelems);
+		for (i = 0; i < nelems; i++)
+			pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i]));
+		ExecClearTuple(slot_pub);
+	}
+	ExecDropSingleTupleTableSlot(slot_pub);
+	walrcv_clear_result(res_pub);
+
+	/*
+	 * Store the column names only if they are contained in column filter
+	 * LogicalRepRelation will only contain attributes corresponding
+	 * to those specficied in column filters.
+	 */
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
+		char * rel_colname =
 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		bool found = false;
 		Assert(!isnull);
+		if (pub_columns != NIL)
+		{
+			foreach(lc, pub_columns)
+			{
+				char *pub_colname = lfirst(lc);
+				if(!strcmp(pub_colname, rel_colname))
+				{
+					found = true;
+					lrel->attnames[natt] = rel_colname;
+					break;
+				}
+			}
+		}
+		else
+		{
+			found = true;
+			lrel->attnames[natt] = rel_colname;
+		}
+		if (!found)
+			continue;
+
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
@@ -829,8 +907,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 14d737fd93..bbaf8dcd5e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *att_map);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -130,6 +134,7 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+	Bitmapset *att_map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->att_map);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+							Bitmapset *att_map)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -609,14 +615,24 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
-
+		/*
+		 * Do not send type information if attribute is
+		 * not present in column filter.
+		 * XXX Allow sending type information for REPLICA
+		 * IDENTITY COLUMNS with user created type.
+		 * even when they are not mentioned in column filters.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						att_map))
+			continue;
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, att_map);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1119,6 +1135,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid		ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1139,8 +1156,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
-		entry->map = NULL;		/* will be set by maybe_send_schema() if
-								 * needed */
+		entry->att_map = NULL;
+		entry->map = NULL;	/* will be set by maybe_send_schema() if needed */
 	}
 
 	/* Validate the entry */
@@ -1171,6 +1188,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1181,7 +1199,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
 
 				/*
 				 * For a partition, check if any of the ancestors are
@@ -1206,6 +1223,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1224,15 +1242,41 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				int                             nelems, i;
+				bool isnull;
+				Datum *elems;
+				HeapTuple pub_rel_tuple;
+				Datum pub_rel_cols;
+				List *columns = NIL;
+
+				if (ancestor_published)
+					pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(ancestor_id),
+								ObjectIdGetDatum(pub->oid));
+				else
+					pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid),
+								ObjectIdGetDatum(pub->oid));
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, pub_rel_tuple, Anum_pg_publication_rel_prattrs, &isnull);
+					if (!isnull)
+					{
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						deconstruct_array(DatumGetArrayTypePCopy(pub_rel_cols),
+									TEXTOID, -1, false, 'i',
+								&elems, NULL, &nelems);
+						for (i = 0; i < nelems; i++)
+							columns = lappend(columns, TextDatumGetCString(elems[i]));
+						entry->att_map = get_table_columnset(publish_as_relid, columns, entry->att_map);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1328,6 +1372,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->att_map);
+		entry->att_map = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 561266aa3e..809413938f 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	List	*columns;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..d1d4eec2c0 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	text			prattrs[1]; /* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -40,6 +43,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
  */
 typedef FormData_pg_publication_rel *Form_pg_publication_rel;
 
+DECLARE_TOAST(pg_publication_rel, 8895, 8896);
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 743e5aa4f3..4c0c5afacd 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3628,6 +3628,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	List	   *columns;		/* List of columns in a publication table */
 } PublicationTable;
 
 typedef struct CreatePublicationStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..709b4be916 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *att_map);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index b4faa1c123..b4c49fa32f 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -680,5 +680,6 @@ RelationGetSmgr(Relation rel)
 /* routines in utils/cache/relcache.c */
 extern void RelationIncrementReferenceCount(Relation rel);
 extern void RelationDecrementReferenceCount(Relation rel);
+extern Bitmapset* get_table_columnset(Oid relid, List *columns, Bitmapset *att_map);
 
 #endif							/* REL_H */
-- 
2.20.1

Reply via email to