Hi,

I went through the v9 patch, and I have a couple comments / questions. Apologies if some of this was already discussed earlier, it's hard to cross-check in such a long thread. Most of the comments are in 0002 to make it easier to locate, and it also makes proposed code changes clearer I think.

1) check_publication_add_relation - the "else" branch is not really needed, because the "if (replidentfull)" always errors-out

2) publication_add_relation has a FIXME about handling cases with different column list

So what's the right behavior for ADD TABLE with different column list? I'd say we should allow that, and that it should be mostly the same thing as adding/removing columns to the list incrementally, i.e. we should replace the column lists. We could also prohibit such changes, but that seems like a really annoying limitation, forcing people to remove/add the relation.

I added some comments to the attmap translation block, and replaced <0 check with AttrNumberIsForUserDefinedAttr.

But I wonder if we could get rid of the offset, considering we're dealing with just user-defined attributes. That'd make the code clearer, but it would break if we're comparing it to other bitmaps with offsets. But I don't think we do.

3) I doubt "att_map" is the right name, though. AFAICS it's just a list of columns for the relation, not a map, right? So maybe attr_list?

4) AlterPublication talks about "publication status" for a column, but do we actually track that? Or what does that mean?

5) PublicationDropTables does a check

    if (pubrel->columns)
        ereport(ERROR,
                errcode(ERRCODE_SYNTAX_ERROR),

Shouldn't this be prevented by the grammar, really? Also, it should be in regression tests.

6) Another thing that should be in the test is partitioned table with attribute mapping and column list, to see how map and attr_map interact.

7) There's a couple places doing this

    if (att_map != NULL &&
        !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
                       att_map) &&
        !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
                       idattrs) &&
        !replidentfull)

which is really hard to understand (even if we get rid of the offset), so maybe let's move that to a function with sensible name. Also, some places don't check indattrs - seems a bit suspicious.


regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From fb5ce02d36b46f92ab01c9a823cc4e315cfcb73c Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 14 Dec 2021 20:53:28 +0100
Subject: [PATCH 1/2] v9

---
 doc/src/sgml/ref/alter_publication.sgml      |   4 +-
 doc/src/sgml/ref/create_publication.sgml     |  11 +-
 src/backend/catalog/dependency.c             |   8 +-
 src/backend/catalog/objectaddress.c          |   8 +
 src/backend/catalog/pg_depend.c              |  50 ++++++
 src/backend/catalog/pg_publication.c         | 106 +++++++++++-
 src/backend/commands/publicationcmds.c       |  94 ++++++++++-
 src/backend/commands/tablecmds.c             |  10 +-
 src/backend/nodes/copyfuncs.c                |   1 +
 src/backend/nodes/equalfuncs.c               |   1 +
 src/backend/parser/gram.y                    |  36 ++++-
 src/backend/replication/logical/proto.c      |  97 ++++++++---
 src/backend/replication/logical/tablesync.c  | 117 +++++++++++++-
 src/backend/replication/pgoutput/pgoutput.c  |  80 +++++++--
 src/bin/pg_dump/pg_dump.c                    |  41 ++++-
 src/bin/pg_dump/pg_dump.h                    |   1 +
 src/bin/psql/describe.c                      |  26 ++-
 src/bin/psql/tab-complete.c                  |   2 +
 src/include/catalog/dependency.h             |   3 +
 src/include/catalog/pg_publication.h         |   1 +
 src/include/catalog/pg_publication_rel.h     |   3 +
 src/include/commands/publicationcmds.h       |   2 +-
 src/include/nodes/parsenodes.h               |   1 +
 src/include/replication/logicalproto.h       |   6 +-
 src/test/regress/expected/publication.out    |  39 ++++-
 src/test/regress/sql/publication.sql         |  19 ++-
 src/test/subscription/t/021_column_filter.pl | 162 +++++++++++++++++++
 27 files changed, 857 insertions(+), 72 deletions(-)
 create mode 100644 src/test/subscription/t/021_column_filter.pl

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..c86055b93c 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]  [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -110,6 +110,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
       name to explicitly indicate that descendant tables are included.
+      Optionally, a column list can be specified.  See <xref
+      linkend="sql-createpublication"/> for details.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      When a column list is specified, only the listed columns are replicated;
+      any other columns are ignored for the purpose of replication through
+      this publication.  If no column list is specified, all columns of the
+      table are replicated through this publication, including any columns
+      added later.  If a column list is specified, it must include the replica
+      identity columns.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index fe9c714257..a88d12e8ae 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -1472,7 +1472,7 @@ doDeletion(const ObjectAddress *object, int flags)
 			break;
 
 		case OCLASS_PUBLICATION_REL:
-			RemovePublicationRelById(object->objectId);
+			RemovePublicationRelById(object->objectId, object->objectSubId);
 			break;
 
 		case OCLASS_PUBLICATION:
@@ -2754,8 +2754,12 @@ free_object_addresses(ObjectAddresses *addrs)
 ObjectClass
 getObjectClass(const ObjectAddress *object)
 {
-	/* only pg_class entries can have nonzero objectSubId */
+	/*
+	 * only pg_class and pg_publication_rel entries can have nonzero
+	 * objectSubId
+	 */
 	if (object->classId != RelationRelationId &&
+		object->classId != PublicationRelRelationId &&
 		object->objectSubId != 0)
 		elog(ERROR, "invalid non-zero objectSubId for object class %u",
 			 object->classId);
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 2bae3fbb17..5eed248dcb 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -4019,6 +4019,7 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
 				/* translator: first %s is, e.g., "table %s" */
 				appendStringInfo(&buffer, _("publication of %s in publication %s"),
 								 rel.data, pubname);
+				/* FIXME add objectSubId support */
 				pfree(rel.data);
 				ReleaseSysCache(tup);
 				break;
@@ -5853,9 +5854,16 @@ getObjectIdentityParts(const ObjectAddress *object,
 
 				getRelationIdentity(&buffer, prform->prrelid, objname, false);
 				appendStringInfo(&buffer, " in publication %s", pubname);
+				if (object->objectSubId)	/* FIXME maybe get_attname */
+					appendStringInfo(&buffer, " column %d", object->objectSubId);
 
 				if (objargs)
+				{
 					*objargs = list_make1(pubname);
+					if (object->objectSubId)
+						*objargs = lappend(*objargs,
+										   psprintf("%d", object->objectSubId));
+				}
 
 				ReleaseSysCache(tup);
 				break;
diff --git a/src/backend/catalog/pg_depend.c b/src/backend/catalog/pg_depend.c
index 5f37bf6d10..dfcb450e61 100644
--- a/src/backend/catalog/pg_depend.c
+++ b/src/backend/catalog/pg_depend.c
@@ -658,6 +658,56 @@ isObjectPinned(const ObjectAddress *object)
  * Various special-purpose lookups and manipulations of pg_depend.
  */
 
+/*
+ * Find all objects of the given class that reference the specified object,
+ * and add them to the given ObjectAddresses.
+ */
+void
+findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+					Oid refclassId, Oid refobjectId, int32 refobjsubId)
+{
+	Relation	depRel;
+	ScanKeyData	key[3];
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	depRel = table_open(DependRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_depend_refclassid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(refclassId));
+	ScanKeyInit(&key[1],
+				Anum_pg_depend_refobjid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(refobjectId));
+	ScanKeyInit(&key[2],
+				Anum_pg_depend_refobjsubid,
+				BTEqualStrategyNumber, F_INT4EQ,
+				Int32GetDatum(refobjsubId));
+
+	scan = systable_beginscan(depRel, DependReferenceIndexId, true,
+							  NULL, 3, key);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup);
+		ObjectAddress	object;
+
+		if (depform->classid != classId)
+			continue;
+
+		ObjectAddressSubSet(object, depform->classid, depform->objid,
+							depform->refobjsubid);
+
+		add_exact_object_address(&object, addrs);
+	}
+
+	systable_endscan(scan);
+
+	table_close(depRel, AccessShareLock);
+}
+
 
 /*
  * Find the extension containing the specified object, if any
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 62f10bcbd2..ae58adc8e5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -46,12 +46,18 @@
 #include "utils/syscache.h"
 
 /*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter
+ * (shifted by FirstLowInvalidHeapAttributeNumber), or NULL if no column
+ * filter was specified.
  */
 static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
 {
+	bool		replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +88,40 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("This operation is not supported for unlogged tables.")));
+
+	/*
+	 * Enforce that the column filter can only leave out columns that aren't
+	 * forced to be sent.
+	 *
+	 * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+	 * columns need to be sent regardless); and in other cases, the columns in
+	 * the REPLICA IDENTITY cannot be left out.
+	 */
+	if (columns != NULL)
+	{
+		if (replidentfull)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+		else
+		{
+			Bitmapset  *idattrs;
+
+			idattrs = RelationGetIndexAttrBitmap(targetrel,
+												 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+			if (!bms_is_subset(idattrs, columns))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+						errmsg("invalid column list for publishing relation \"%s\"",
+							   RelationGetRelationName(targetrel)),
+						errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+
+			if (idattrs)
+				pfree(idattrs);
+		}
+	}
 }
 
 /*
@@ -289,9 +329,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
+	Bitmapset  *attmap = NULL;
+	AttrNumber *attarray;
+	int			natts = 0;
+	int			attnum;
 	ObjectAddress myself,
 				referenced;
 	List	   *relids = NIL;
+	ListCell   *lc;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -305,6 +350,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	{
 		table_close(rel, RowExclusiveLock);
 
+		/* FIXME need to handle the case of different column list */
+
 		if (if_not_exists)
 			return InvalidObjectAddress;
 
@@ -314,7 +361,34 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	attarray = palloc(sizeof(AttrNumber) * list_length(targetrel->columns));
+	foreach(lc, targetrel->columns)
+	{
+		char	   *colname = strVal(lfirst(lc));
+		AttrNumber	attnum = get_attnum(relid, colname);
+
+		if (attnum == InvalidAttrNumber)
+			ereport(ERROR,
+					errcode(ERRCODE_UNDEFINED_COLUMN),
+					errmsg("column \"%s\" of relation \"%s\" does not exist",
+						   colname, RelationGetRelationName(targetrel->relation)));
+		if (attnum < 0)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("cannot reference system column \"%s\" in publication column list",
+						   colname));
+
+		if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, attmap))
+			ereport(ERROR,
+					errcode(ERRCODE_DUPLICATE_OBJECT),
+					errmsg("column \"%s\" specified twice in publication column list",
+						   colname));
+
+		attmap = bms_add_member(attmap, attnum - FirstLowInvalidHeapAttributeNumber);
+		attarray[natts++] = attnum;
+	}
+
+	check_publication_add_relation(targetrel->relation, attmap);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -327,6 +401,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	if (targetrel->columns)
+	{
+		int2vector *prattrs;
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+	}
+	else
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -344,6 +427,21 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/*
+	 * If there's an explicit column list, make one dependency entry for each
+	 * column.  Note that the referencing side of the dependency is also
+	 * specific to one column, so that it can be dropped separately if the
+	 * column is dropped.
+	 */
+	while ((attnum = bms_first_member(attmap)) >= 0)
+	{
+		ObjectAddressSubSet(referenced, RelationRelationId, relid,
+							attnum + FirstLowInvalidHeapAttributeNumber);
+		myself.objectSubId = attnum + FirstLowInvalidHeapAttributeNumber;
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+	myself.objectSubId = 0;		/* need to undo this bit */
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 404bb5d0c8..a070914bdd 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -561,7 +561,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-
+				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
 		}
@@ -757,10 +757,11 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 }
 
 /*
- * Remove relation from publication by mapping OID.
+ * Remove relation from publication by mapping OID, or publication status
+ * of one column of that relation in the publication if an attnum is given.
  */
 void
-RemovePublicationRelById(Oid proid)
+RemovePublicationRelById(Oid proid, int32 attnum)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -790,7 +791,81 @@ RemovePublicationRelById(Oid proid)
 
 	InvalidatePublicationRels(relids);
 
-	CatalogTupleDelete(rel, &tup->t_self);
+	/*
+	 * If no column is given, simply delete the relation from the publication.
+	 *
+	 * If a column is given, what we do instead is to remove that column from
+	 * the column list.  The relation remains in the publication, with the
+	 * other columns.  However, dropping the last column is disallowed.
+	 */
+	if (attnum == 0)
+	{
+		CatalogTupleDelete(rel, &tup->t_self);
+	}
+	else
+	{
+		Datum		adatum;
+		ArrayType  *arr;
+		int			nelems;
+		int16	   *elems;
+		int16	   *newelems;
+		int2vector *newvec;
+		Datum		values[Natts_pg_publication_rel];
+		bool		nulls[Natts_pg_publication_rel];
+		bool		replace[Natts_pg_publication_rel];
+		HeapTuple	newtup;
+		int			i,
+					j;
+		bool		isnull;
+
+		/* Obtain the original column list */
+		adatum = SysCacheGetAttr(PUBLICATIONRELMAP,
+								 tup,
+								 Anum_pg_publication_rel_prattrs,
+								 &isnull);
+		if (isnull)			/* shouldn't happen */
+			elog(ERROR, "can't drop column from publication without a column list");
+		arr = DatumGetArrayTypeP(adatum);
+		nelems = ARR_DIMS(arr)[0];
+		elems = (int16 *) ARR_DATA_PTR(arr);
+
+		/* Construct a list excluding the given column */
+		newelems = palloc(sizeof(int16) * nelems - 1);
+		for (i = 0, j = 0; i < nelems - 1; i++)
+		{
+			if (elems[i] == attnum)
+				continue;
+			newelems[j++] = elems[i];
+		}
+
+		/*
+		 * If this is the last column used in the publication, disallow the
+		 * command. We could alternatively just drop the relation from the
+		 * publication.
+		 */
+		if (j == 0)
+		{
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot drop the last column in publication \"%s\"",
+						   get_publication_name(pubrel->prpubid, false)),
+					errhint("Remove table \"%s\" from the publication first.",
+							get_rel_name(pubrel->prrelid)));
+		}
+
+		/* Build the updated tuple */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, false, sizeof(nulls));
+		MemSet(replace, false, sizeof(replace));
+		newvec = buildint2vector(newelems, j);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(newvec);
+		replace[Anum_pg_publication_rel_prattrs - 1] = true;
+
+		/* Execute the update */
+		newtup = heap_modify_tuple(tup, RelationGetDescr(rel),
+								   values, nulls, replace);
+		CatalogTupleUpdate(rel, &tup->t_self, newtup);
+	}
 
 	ReleaseSysCache(tup);
 
@@ -932,6 +1007,8 @@ OpenTableList(List *tables)
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->columns = t->columns;
+
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
@@ -965,8 +1042,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
+
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				pub_rel->columns = t->columns;
+
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -1074,6 +1154,12 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
+		if (pubrel->columns)
+			ereport(ERROR,
+					errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("column list may not be specified for relation \"%s\" in ALTER PUBLICATION ... SET/DROP command",
+						   RelationGetRelationName(pubrel->relation)));
+
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 47b29001d5..7207dcf9c0 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,8 +40,9 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
-#include "catalog/pg_tablespace.h"
+#include "catalog/pg_publication_rel.h"
 #include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -8420,6 +8421,13 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	ReleaseSysCache(tuple);
 
+	/*
+	 * If the column is part of a replication column list, arrange to get that
+	 * removed too.
+	 */
+	findAndAddAddresses(addrs, PublicationRelRelationId,
+						RelationRelationId, RelationGetRelid(rel), attnum);
+
 	/*
 	 * Propagate to children as appropriate.  Unlike most other ALTER
 	 * routines, we have to do this one level of recursion at a time; we can't
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..4dad6fedfb 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9742,12 +9742,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->columns = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9762,28 +9763,38 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2 != NULL)
+					{
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->columns = $2;
+						$$->name = NULL;
+					}
+					else
+						$$->name = $1;
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->columns = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->columns = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -17435,8 +17446,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 			{
 				/* convert it to PublicationTable */
 				PublicationTable *pubtable = makeNode(PublicationTable);
-				pubtable->relation = makeRangeVar(NULL, pubobj->name,
-												  pubobj->location);
+
+				pubtable->relation =
+					makeRangeVar(NULL, pubobj->name, pubobj->location);
 				pubobj->pubtable = pubtable;
 				pubobj->name = NULL;
 			}
@@ -17444,6 +17456,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
 		{
+			/*
+			 * This can happen if a column list is specified in a continuation
+			 * for a schema entry; reject it.
+			 */
+			if (pubobj->pubtable)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("column specification not allowed for schemas"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..15d8192238 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,9 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+								   HeapTuple tuple, bool binary, Bitmapset *att_map);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, att_map);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map)
 {
 	char	   *relname;
 
@@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, att_map);
 }
 
 /*
@@ -749,20 +749,42 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary,
+					   Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
 	bool		isnull[MaxTupleAttributeNumber];
 	int			i;
 	uint16		nliveatts = 0;
+	Bitmapset  *idattrs = NULL;
+	bool		replidentfull;
+	Form_pg_attribute att;
 
 	desc = RelationGetDescr(rel);
 
+	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	if (!replidentfull)
+		idattrs = RelationGetIdentityKeyBitmap(rel);
+
 	for (i = 0; i < desc->natts; i++)
 	{
+		att = TupleDescAttr(desc, i);
 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
 			continue;
+
+		/*
+		 * Do not increment count of attributes if not a part of column
+		 * filters except for replica identity columns or if replica identity
+		 * is full.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map) &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   idattrs) &&
+			!replidentfull)
+			continue;
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
@@ -800,6 +822,19 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 			continue;
 		}
 
+		/*
+		 * Do not send attribute data if it is not a part of column filters,
+		 * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is
+		 * full, send the data.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map) &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   idattrs) &&
+			!replidentfull)
+			continue;
+
 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
 		if (!HeapTupleIsValid(typtup))
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
@@ -904,7 +939,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +949,35 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
+	/* fetch bitmap of REPLICATION IDENTITY attributes */
+	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	if (!replidentfull)
+		idattrs = RelationGetIdentityKeyBitmap(rel);
+
 	/* send number of live attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
+		if (replidentfull ||
+			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+		{
+			nliveatts++;
+			continue;
+		}
+		/* Skip sending if not a part of column filter */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map))
 			continue;
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
 
-	/* fetch bitmap of REPLICATION IDENTITY attributes */
-	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
-	if (!replidentfull)
-		idattrs = RelationGetIdentityKeyBitmap(rel);
-
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -937,6 +987,17 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/*
+		 * Exclude filtered columns, but REPLICA IDENTITY columns can't be
+		 * excluded
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map) &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   idattrs)
+			&& !replidentfull)
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..15902faf56 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -695,19 +696,25 @@ fetch_remote_table_info(char *nspname, char *relname,
 						LogicalRepRelation *lrel)
 {
 	WalRcvExecResult *res;
+	WalRcvExecResult *res_pub;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
+	TupleTableSlot *slot_pub;
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			pubRow[] = {TEXTARRAYOID};
 	bool		isnull;
 	int			natt;
+	List	   *pub_columns = NIL;
+	ListCell   *lc;
+	bool		am_partition = false;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,6 +744,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
@@ -774,11 +782,101 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+
+	/*
+	 * Now, fetch the values of publications' column filters.
+	 *
+	 * For a partition, use pg_inherit to find the parent, as the
+	 * pg_publication_rel contains only the topmost parent table entry in case
+	 * the table is partitioned.  Run a recursive query to iterate through all
+	 * the parents of the partition and retreive the record for the parent
+	 * that exists in pg_publication_rel.
+	 */
+	resetStringInfo(&cmd);
+	appendStringInfoString(&cmd,
+						   "SELECT CASE WHEN prattrs IS NOT NULL THEN\n"
+						   "           ARRAY(SELECT attname\n"
+						   "                   FROM pg_catalog.generate_series(0, pg_catalog.array_upper(prattrs::int[], 1)) s,\n"
+						   "                        pg_catalog.pg_attribute\n"
+						   "                  WHERE attrelid = prrelid AND attnum = prattrs[s])\n"
+						   "           ELSE NULL END AS columns\n"
+						   "FROM pg_catalog.pg_publication_rel\n");
+	if (!am_partition)
+		appendStringInfo(&cmd, "WHERE prrelid = %u", lrel->remoteid);
+	else
+		appendStringInfo(&cmd,
+						 "WHERE prrelid IN (SELECT relid \n"
+						 "FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+						 lrel->remoteid);
+
+	res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+						  lengthof(pubRow), pubRow);
+
+	if (res_pub->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res_pub->err)));
+	slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple);
+
+	while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub))
+	{
+		Datum		adatum;
+		Datum	   *elems;
+		bool	   *nulls;
+		int			nelems;
+
+		adatum = slot_getattr(slot_pub, 1, &isnull);
+		if (isnull)			/* shouldn't happen */
+			elog(ERROR, "unexpected null value in publication column filter");
+		deconstruct_array(DatumGetArrayTypeP(adatum),
+						  TEXTOID, -1, false, TYPALIGN_INT,
+						  &elems, &nulls, &nelems);
+		for (int i = 0; i < nelems; i++)
+		{
+			if (nulls[i])	/* shouldn't happen */
+				elog(ERROR, "unexpected null value in publication column filter");
+			pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i]));
+		}
+		ExecClearTuple(slot_pub);
+	}
+	ExecDropSingleTupleTableSlot(slot_pub);
+	walrcv_clear_result(res_pub);
+
+	/*
+	 * Store the column names only if they are contained in column filter
+	 * LogicalRepRelation will only contain attributes corresponding to those
+	 * specficied in column filters.
+	 */
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
-			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		char	   *rel_colname;
+		bool		found = false;
+
+		rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
+		if (pub_columns != NIL)
+		{
+			foreach(lc, pub_columns)
+			{
+				char	   *pub_colname = lfirst(lc);
+
+				if (!strcmp(pub_colname, rel_colname))
+				{
+					found = true;
+					lrel->attnames[natt] = rel_colname;
+					break;
+				}
+			}
+		}
+		else
+		{
+			found = true;
+			lrel->attnames[natt] = rel_colname;
+		}
+		if (!found)
+			continue;
+
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
@@ -829,8 +927,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..f9f9ecd0c0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *att_map);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -130,6 +134,7 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+	Bitmapset  *att_map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->att_map);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+						Bitmapset *att_map)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -610,13 +616,25 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
 
+		/*
+		 * Do not send type information if attribute is not present in column
+		 * filter. XXX Allow sending type information for REPLICA IDENTITY
+		 * COLUMNS with user created type. even when they are not mentioned in
+		 * column filters.
+		 *
+		 * FIXME -- this code seems not verified by tests.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map))
+			continue;
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, att_map);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -693,7 +711,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -722,7 +740,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1122,6 +1140,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid			ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1142,6 +1161,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
+		entry->att_map = NULL;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
 	}
@@ -1182,6 +1202,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1192,8 +1213,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
-
 				/*
 				 * For a partition, check if any of the ancestors are
 				 * published.  If so, note down the topmost ancestor that is
@@ -1219,6 +1238,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1239,15 +1259,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				Oid			relid;
+				HeapTuple	pub_rel_tuple;
+
+				relid = ancestor_published ? ancestor_id : publish_as_relid;
+				pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+												ObjectIdGetDatum(relid),
+												ObjectIdGetDatum(pub->oid));
+
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					Datum		pub_rel_cols;
+					bool		isnull;
+
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+												   pub_rel_tuple,
+												   Anum_pg_publication_rel_prattrs,
+												   &isnull);
+					if (!isnull)
+					{
+						ArrayType  *arr;
+						int			nelems;
+						int16	   *elems;
+
+						arr = DatumGetArrayTypeP(pub_rel_cols);
+						nelems = ARR_DIMS(arr)[0];
+						elems = (int16 *) ARR_DATA_PTR(arr);
+
+						/* XXX is there a danger of memory leak here? beware */
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						for (int i = 0; i < nelems; i++)
+							entry->att_map = bms_add_member(entry->att_map,
+															elems[i] - FirstLowInvalidHeapAttributeNumber);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
-
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1343,6 +1395,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->att_map);
+		entry->att_map = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 10a86f9810..0c438481dc 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4265,6 +4265,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prattrs;
 	int			i,
 				j,
 				ntups;
@@ -4276,8 +4277,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 	/* Collect all publication membership info. */
 	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+						 "SELECT tableoid, oid, prpubid, prrelid");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, ", prattrs");
+	else
+		appendPQExpBufferStr(query, ", NULL as prattrs");
+	appendPQExpBufferStr(query,
+						 " FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4286,6 +4292,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prattrs = PQfnumber(res, "prattrs");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4327,6 +4334,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
 
+		if (!PQgetisnull(res, i, i_prattrs))
+		{
+			char	  **attnames;
+			int			nattnames;
+			PQExpBuffer attribs;
+
+			if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+							  &attnames, &nattnames))
+				fatal("could not parse %s array", "prattrs");
+			attribs = createPQExpBuffer();
+			for (int k = 0; k < nattnames; k++)
+			{
+				if (k > 0)
+					appendPQExpBufferStr(attribs, ", ");
+
+				appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+			}
+			pubrinfo[i].pubrattrs = attribs->data;
+		}
+		else
+			pubrinfo[j].pubrattrs = NULL;
+
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
 
@@ -4391,10 +4420,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	query = createPQExpBuffer();
 
-	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
-					  fmtQualifiedDumpable(tbinfo));
+	appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrattrs)
+		appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 6dccb4be4e..50a5b885f6 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -633,6 +633,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrattrs;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 72d8547628..46fa616406 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6302,7 +6302,7 @@ listPublications(const char *pattern)
  */
 static bool
 addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
-						   bool singlecol, printTableContent *cont)
+						   bool as_schema, printTableContent *cont)
 {
 	PGresult   *res;
 	int			count = 0;
@@ -6319,10 +6319,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 
 	for (i = 0; i < count; i++)
 	{
-		if (!singlecol)
+		if (!as_schema)			/* as table */
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
-		else
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+		}
+		else					/* as schema */
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
 		printTableAddFooter(cont, buf->data);
@@ -6450,8 +6454,20 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname, \n");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 "       CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+									 "       pg_catalog.array_to_string"
+									 "(ARRAY(SELECT attname\n"
+									 "         FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
+									 "              pg_catalog.pg_attribute\n"
+									 "        WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+									 "       ELSE NULL END AS columns");
+			else
+				appendPQExpBufferStr(&buf, "NULL as columns");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca3db..84ee807e0b 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index 3eca295ff4..76d421e09e 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -214,6 +214,9 @@ extern long changeDependenciesOf(Oid classId, Oid oldObjectId,
 extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId,
 								 Oid newRefObjectId);
 
+extern void findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+					Oid refclassId, Oid refobjectId, int32 refobjsubId);
+
 extern Oid	getExtensionOfObject(Oid classId, Oid objectId);
 extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId);
 
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..f5ae2065e9 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	List	   *columns;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..7ad285faae 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	int2vector	prattrs;		/* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 4ba68c70ee..23f037df7f 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -25,7 +25,7 @@
 extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
 extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
 extern void RemovePublicationById(Oid pubid);
-extern void RemovePublicationRelById(Oid proid);
+extern void RemovePublicationRelById(Oid proid, int32 attnum);
 extern void RemovePublicationSchemaById(Oid psoid);
 
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..02b547d044 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	List	   *columns;		/* List of columns in a publication table */
 } PublicationTable;
 
 /*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..709b4be916 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *att_map);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5ac2d666a2..84afe0ebef 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,35 @@ Publications:
  regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl5"
+DETAIL:  All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+                                Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_tbl5" (a)
+Tables from schemas:
+    "pub_test"
+
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+ERROR:  cannot drop the last column in publication "testpub_fortable"
+HINT:  Remove table "testpub_tbl5" from the publication first.
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 CREATE TABLE testpub_tbl3 (a int);
 CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -669,6 +697,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
 Tables from schemas:
     "pub_test1"
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR:  syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+                                                                ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR:  column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+                                                             ^
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..200158ba69 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,20 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
 \d+ testpub_tbl2
 \dRp+ testpub_foralltables
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 
 CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +375,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
new file mode 100644
index 0000000000..354e6ac363
--- /dev/null
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -0,0 +1,162 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column c is not replicated');
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2");
+is($result, qq(1|abc), 'insert on column c is not replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET c = 5 where a = 1");
+is($result, qq(1|abc), 'update on column c is not replicated');
+
+# Test behavior when a column is dropped
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_part DROP COLUMN b");
+$result = $node_publisher->safe_psql('postgres',
+	"select prrelid::regclass, prattrs from pg_publication_rel pb;");
+is($result,
+	q(tab1|1 2
+tab3|1 3
+tab2|1 2
+test_part|1), 'column test_part.b removed');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (3, '2021-12-13 12:13:14')");
+$node_publisher->wait_for_catchup('sub1');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part WHERE a = 3");
+is($result, "3|", 'only column a is replicated');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab4 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab4 (a, d)");
+$node_subscriber->safe_psql('postgres',    "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab4;"),
+   qq(1|11|1111
+2|22|2222),
+   'overlapping publications with overlapping column lists');
-- 
2.31.1

From a1cb64a17b35dd75daf22a9f59da176ed109612a Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 14 Dec 2021 22:11:10 +0100
Subject: [PATCH 2/2] review

---
 src/backend/catalog/pg_publication.c        | 60 +++++++++++++++------
 src/backend/commands/publicationcmds.c      | 12 +++++
 src/backend/commands/tablecmds.c            |  2 +-
 src/backend/replication/logical/proto.c     |  8 +++
 src/backend/replication/pgoutput/pgoutput.c |  8 +++
 5 files changed, 72 insertions(+), 18 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ae58adc8e5..7a478b7072 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -99,28 +99,27 @@ check_publication_add_relation(Relation targetrel, Bitmapset *columns)
 	 */
 	if (columns != NULL)
 	{
+		Bitmapset  *idattrs;
+
 		if (replidentfull)
 			ereport(ERROR,
 					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					errmsg("invalid column list for publishing relation \"%s\"",
 						   RelationGetRelationName(targetrel)),
 					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
-		else
-		{
-			Bitmapset  *idattrs;
-
-			idattrs = RelationGetIndexAttrBitmap(targetrel,
-												 INDEX_ATTR_BITMAP_IDENTITY_KEY);
-			if (!bms_is_subset(idattrs, columns))
-				ereport(ERROR,
-						errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-						errmsg("invalid column list for publishing relation \"%s\"",
-							   RelationGetRelationName(targetrel)),
-						errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
-
-			if (idattrs)
-				pfree(idattrs);
-		}
+
+		/* XXX The else was unnecessary, because the "if" always errors-out */
+		idattrs = RelationGetIndexAttrBitmap(targetrel,
+											 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+		if (!bms_is_subset(idattrs, columns))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+
+		if (idattrs)
+			pfree(idattrs);
 	}
 }
 
@@ -352,6 +351,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 
 		/* FIXME need to handle the case of different column list */
 
+		/*
+		 * XXX So what's the right behavior for ADD TABLE with different column
+		 * list? I'd say we should allow that, and that it should be mostly the
+		 * same thing as adding/removing columns to the list incrementally, i.e.
+		 * we should replace the column lists. We could also prohibit, but that
+		 * seems like a really annoying limitation, forcing people to remove/add
+		 * the relation.
+		 */
+
 		if (if_not_exists)
 			return InvalidObjectAddress;
 
@@ -361,6 +369,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
+	/*
+	 * Translate list of columns to attnums. We prohibit system attributes and
+	 * make sure there are no duplicate columns.
+	 */
 	attarray = palloc(sizeof(AttrNumber) * list_length(targetrel->columns));
 	foreach(lc, targetrel->columns)
 	{
@@ -372,12 +384,23 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 					errcode(ERRCODE_UNDEFINED_COLUMN),
 					errmsg("column \"%s\" of relation \"%s\" does not exist",
 						   colname, RelationGetRelationName(targetrel->relation)));
-		if (attnum < 0)
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
 			ereport(ERROR,
 					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
 					errmsg("cannot reference system column \"%s\" in publication column list",
 						   colname));
 
+		/*
+		 * XXX The offset seems kinda pointless, because at this point we're not
+		 * dealing with system attributes, so all attnums are > 0. Are we comparing
+		 * it to arbitrary attnums later? This would simplify adding the deps later
+		 * because we're offsetting it back.
+		 *
+		 * XXX I wouldn't say "twice" though. It can be specified multiple times,
+		 * it's just that we discover it during the second occurrence. I'd say
+		 * "duplicate column name in publication column list" instead.
+		 */
 		if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, attmap))
 			ereport(ERROR,
 					errcode(ERRCODE_DUPLICATE_OBJECT),
@@ -417,6 +440,9 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	CatalogTupleInsert(rel, tup);
 	heap_freetuple(tup);
 
+	/* XXX not sure if worth an explicit free, but if we free the tuple ... */
+	pfree(attarray);
+
 	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
 
 	/* Add dependency on the publication */
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index a070914bdd..38efc5ad59 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -759,6 +759,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 /*
  * Remove relation from publication by mapping OID, or publication status
  * of one column of that relation in the publication if an attnum is given.
+ *
+ * XXX Whats "publication status"?
  */
 void
 RemovePublicationRelById(Oid proid, int32 attnum)
@@ -825,6 +827,9 @@ RemovePublicationRelById(Oid proid, int32 attnum)
 								 &isnull);
 		if (isnull)			/* shouldn't happen */
 			elog(ERROR, "can't drop column from publication without a column list");
+
+		/* XXX We're reading the array in multiple places, I suggest to move it
+		 * to a separate function, maybe? */
 		arr = DatumGetArrayTypeP(adatum);
 		nelems = ARR_DIMS(arr)[0];
 		elems = (int16 *) ARR_DATA_PTR(arr);
@@ -833,6 +838,7 @@ RemovePublicationRelById(Oid proid, int32 attnum)
 		newelems = palloc(sizeof(int16) * nelems - 1);
 		for (i = 0, j = 0; i < nelems - 1; i++)
 		{
+			/* XXX Can it happen that we never find a match? Seems like an error. */
 			if (elems[i] == attnum)
 				continue;
 			newelems[j++] = elems[i];
@@ -842,6 +848,10 @@ RemovePublicationRelById(Oid proid, int32 attnum)
 		 * If this is the last column used in the publication, disallow the
 		 * command. We could alternatively just drop the relation from the
 		 * publication.
+		 *
+		 * XXX Alternatively we could switch to "replicate all column", but
+		 * that seems counterintuitive. However, is there a way to switch
+		 * between these two modes, somehow?
 		 */
 		if (j == 0)
 		{
@@ -1154,6 +1164,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
+		/* XXX Shouldn't this be prevented by the grammar, ideally? Can it actually
+		 * happen? It does not seem to be tested in the regression tests. */
 		if (pubrel->columns)
 			ereport(ERROR,
 					errcode(ERRCODE_SYNTAX_ERROR),
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 7207dcf9c0..f7c21158c0 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8422,7 +8422,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 	ReleaseSysCache(tuple);
 
 	/*
-	 * If the column is part of a replication column list, arrange to get that
+	 * If the column is part of a publication column list, arrange to get that
 	 * removed too.
 	 */
 	findAndAddAddresses(addrs, PublicationRelRelationId,
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 15d8192238..885acd6c9e 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -777,6 +777,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 		 * Do not increment count of attributes if not a part of column
 		 * filters except for replica identity columns or if replica identity
 		 * is full.
+		 *
+		 * XXX This exact check is done in multiple places, so maybe let's move it
+		 * to a separate function and call it. Also, att_map is for user attrs only
+		 * so maybe we can get rid of the offsets?
+		 *
+		 * XXX Why do we need to check both att_map and idattrs? Aren't we enforcing
+		 * that indattrs is always a subset of att_map?
 		 */
 		if (att_map != NULL &&
 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
@@ -970,6 +977,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map)
 			continue;
 		}
 		/* Skip sending if not a part of column filter */
+		/* XXX How come this is not checking idattrs and replindentfull? */
 		if (att_map != NULL &&
 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
 						   att_map))
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f9f9ecd0c0..551c4d5315 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -134,6 +134,14 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+
+	/* XXX Needs a comment explaining what att_map does, how it's different
+	 * from map. The naming seems really confusing and it's a bit unclear how
+	 * these two bits interact (i.e. partitioning + column list).
+	 *
+	 * XXX In fact, is it really mapping anything? It seems more like a simple
+	 * list of attnums, translated from list of names.
+	 */
 	Bitmapset  *att_map;
 } RelationSyncEntry;
 
-- 
2.31.1

Reply via email to