OK, getting closer now.  I've fixed the code to filter them column list
during the initial sync, and added some more tests for code that wasn't
covered.

There are still some XXX comments.  The one that bothers me most is the
lack of an implementation that allows changing the column list in a
publication without having to remove the table from the publication
first.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
"I'm always right, but sometimes I'm more right than other times."
                                                  (Linus Torvalds)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 34a7034282..5bc2e7a591 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6877,7 +6877,9 @@ Relation
 </listitem>
 </varlistentry>
 </variablelist>
-        Next, the following message part appears for each column (except generated columns):
+        Next, the following message part appears for each column (except
+        generated columns and other columns that don't appear in the column
+        filter list, for tables that have one):
 <variablelist>
 <varlistentry>
 <term>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..c86055b93c 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]  [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -110,6 +110,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
       name to explicitly indicate that descendant tables are included.
+      Optionally, a column list can be specified.  See <xref
+      linkend="sql-createpublication"/> for details.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      When a column list is specified, only the listed columns are replicated;
+      any other columns are ignored for the purpose of replication through
+      this publication.  If no column list is specified, all columns of the
+      table are replicated through this publication, including any columns
+      added later.  If a column list is specified, it must include the replica
+      identity columns.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 62f10bcbd2..88e94a7cda 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -46,12 +46,17 @@
 #include "utils/syscache.h"
 
 /*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter, or NULL if
+ * no column filter was specified.
  */
 static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
 {
+	bool		replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +87,55 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("This operation is not supported for unlogged tables.")));
+
+	/*
+	 * Enforce that the column filter can only leave out columns that aren't
+	 * forced to be sent.
+	 *
+	 * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+	 * columns need to be sent regardless); and in other cases, the columns in
+	 * the REPLICA IDENTITY cannot be left out.
+	 */
+	if (columns != NULL)
+	{
+		Bitmapset  *idattrs;
+		int			x;
+
+		/*
+		 * Even if the user listed all columns in the column list, we cannot
+		 * allow a column list to be specified when REPLICA IDENTITY is FULL;
+		 * that would cause problems if a new column is added later, because
+		 * that could would have to be included (because of being part of the
+		 * replica identity) but it's technically not allowed (because of not
+		 * being in the publication's column list yet).  So reject this case
+		 * altogether.
+		 */
+		if (replidentfull)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+		idattrs = RelationGetIndexAttrBitmap(targetrel,
+											 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+		/*
+		 * We have to test membership the hard way, because the values returned
+		 * by RelationGetIndexAttrBitmap are offset.
+		 */
+		x = -1;
+		while ((x = bms_next_member(idattrs, x)) >= 0)
+		{
+			if (!bms_is_member(x + FirstLowInvalidHeapAttributeNumber, columns))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+						errmsg("invalid column list for publishing relation \"%s\"",
+							   RelationGetRelationName(targetrel)),
+						errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+		}
+
+		bms_free(idattrs);
+	}
 }
 
 /*
@@ -289,9 +343,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
+	Bitmapset  *attmap = NULL;
+	AttrNumber *attarray;
+	int			natts = 0;
 	ObjectAddress myself,
 				referenced;
 	List	   *relids = NIL;
+	ListCell   *lc;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -305,6 +363,17 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	{
 		table_close(rel, RowExclusiveLock);
 
+		/* FIXME need to handle the case of different column list */
+
+		/*
+		 * XXX So what's the right behavior for ADD TABLE with different column
+		 * list? I'd say we should allow that, and that it should be mostly the
+		 * same thing as adding/removing columns to the list incrementally, i.e.
+		 * we should replace the column lists. We could also prohibit, but that
+		 * seems like a really annoying limitation, forcing people to remove/add
+		 * the relation.
+		 */
+
 		if (if_not_exists)
 			return InvalidObjectAddress;
 
@@ -314,7 +383,45 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	/*
+	 * Translate list of columns to attnums. We prohibit system attributes and
+	 * make sure there are no duplicate columns.
+	 *
+	 * Note that the attribute numbers are *not* offset by
+	 * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
+	 * should be okay.
+	 */
+	attarray = palloc(sizeof(AttrNumber) * list_length(targetrel->columns));
+	foreach(lc, targetrel->columns)
+	{
+		char	   *colname = strVal(lfirst(lc));
+		AttrNumber	attnum = get_attnum(relid, colname);
+
+		if (attnum == InvalidAttrNumber)
+			ereport(ERROR,
+					errcode(ERRCODE_UNDEFINED_COLUMN),
+					errmsg("column \"%s\" of relation \"%s\" does not exist",
+						   colname, RelationGetRelationName(targetrel->relation)));
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("cannot reference system column \"%s\" in publication column list",
+						   colname));
+
+		if (bms_is_member(attnum, attmap))
+			ereport(ERROR,
+					errcode(ERRCODE_DUPLICATE_OBJECT),
+					errmsg("duplicate column \"%s\" in publication column list",
+						   colname));
+
+		attmap = bms_add_member(attmap, attnum);
+		attarray[natts++] = attnum;
+	}
+
+	check_publication_add_relation(targetrel->relation, attmap);
+
+	bms_free(attmap);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -327,6 +434,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	if (targetrel->columns)
+	{
+		int2vector *prattrs;
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+	}
+	else
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -334,8 +450,16 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	CatalogTupleInsert(rel, tup);
 	heap_freetuple(tup);
 
+	/* Register dependencies as needed */
 	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
 
+	/* Add dependency on the columns, if any are listed */
+	for (int i = 0; i < natts; i++)
+	{
+		ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+	}
+	pfree(attarray);
 	/* Add dependency on the publication */
 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
@@ -470,6 +594,74 @@ GetRelationPublications(Oid relid)
 	return result;
 }
 
+/*
+ * Gets a list of OIDs of all column-partial publications of the given
+ * relation, that is, those that specify a column list.
+ */
+List *
+GetRelationColumnPartialPublications(Oid relid)
+{
+	CatCList   *pubrellist;
+	List	   *pubs = NIL;
+
+	pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
+									 ObjectIdGetDatum(relid));
+	for (int i = 0; i < pubrellist->n_members; i++)
+	{
+		HeapTuple	tup = &pubrellist->members[i]->tuple;
+		bool		isnull;
+
+		(void) SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+							   Anum_pg_publication_rel_prattrs,
+							   &isnull);
+		if (isnull)
+			continue;
+
+		pubs = lappend_oid(pubs,
+						   ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid);
+	}
+
+	ReleaseSysCacheList(pubrellist);
+
+	return pubs;
+}
+
+/*
+ * For a relation in a publication that is known to have a non-null column
+ * list, return the list of attribute numbers that are in it.
+ */
+List *
+GetRelationColumnListInPublication(Oid relid, Oid pubid)
+{
+	HeapTuple	tup;
+	Datum		adatum;
+	bool		isnull;
+	ArrayType  *arr;
+	int			nelems;
+	int16	   *elems;
+	List	   *attnos = NIL;
+
+	tup = SearchSysCache2(PUBLICATIONRELMAP,
+						  ObjectIdGetDatum(relid),
+						  ObjectIdGetDatum(pubid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for rel %u of publication %u", relid, pubid);
+	adatum = SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+							 Anum_pg_publication_rel_prattrs, &isnull);
+	if (isnull)
+		elog(ERROR, "found unexpected null in pg_publication_rel.prattrs");
+	arr = DatumGetArrayTypeP(adatum);
+	nelems = ARR_DIMS(arr)[0];
+	elems = (int16 *) ARR_DATA_PTR(arr);
+
+	for (int i = 0; i < nelems; i++)
+		attnos = lappend_oid(attnos, elems[i]);
+
+	ReleaseSysCache(tup);
+
+	return attnos;
+}
+
 /*
  * Gets list of relation oids for a publication.
  *
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 404bb5d0c8..cd2c6a0f70 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -561,7 +561,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-
+				/* This is not needed to delete a table */
+				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
 		}
@@ -932,6 +933,8 @@ OpenTableList(List *tables)
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->columns = t->columns;
+
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
@@ -965,8 +968,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
+
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				pub_rel->columns = t->columns;
+
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -1074,6 +1080,14 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
+		/* XXX Shouldn't this be prevented by the grammar, ideally? Can it actually
+		 * happen? It does not seem to be tested in the regression tests. */
+		if (pubrel->columns)
+			ereport(ERROR,
+					errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("column list may not be specified for relation \"%s\" in ALTER PUBLICATION ... SET/DROP command",
+						   RelationGetRelationName(pubrel->relation)));
+
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 45e59e3d5c..a9051eb5e7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,8 +40,8 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
-#include "catalog/pg_tablespace.h"
 #include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -8347,6 +8347,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 				 bool missing_ok, LOCKMODE lockmode,
 				 ObjectAddresses *addrs)
 {
+	Oid			relid = RelationGetRelid(rel);
 	HeapTuple	tuple;
 	Form_pg_attribute targetatt;
 	AttrNumber	attnum;
@@ -8366,7 +8367,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 	/*
 	 * get the number of the attribute
 	 */
-	tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+	tuple = SearchSysCacheAttName(relid, colName);
 	if (!HeapTupleIsValid(tuple))
 	{
 		if (!missing_ok)
@@ -8420,13 +8421,42 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	ReleaseSysCache(tuple);
 
+	/*
+	 * Also, if the column is used in the column list of a publication,
+	 * disallow the drop if the DROP is RESTRICT.  We don't do anything if the
+	 * DROP is CASCADE, which means that the dependency mechanism will remove
+	 * the relation from the publication.
+	 */
+	if (behavior == DROP_RESTRICT)
+	{
+		List	   *pubs;
+		ListCell   *lc;
+
+		pubs = GetRelationColumnPartialPublications(relid);
+		foreach(lc, pubs)
+		{
+			Oid		pubid = lfirst_oid(lc);
+			List   *published_cols;
+
+			published_cols =
+				GetRelationColumnListInPublication(relid, pubid);
+
+			if (list_member_oid(published_cols, attnum))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+						errmsg("cannot drop column \"%s\" because it is part of publication \"%s\"",
+							   colName, get_publication_name(pubid, false)),
+						errhint("Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication."));
+		}
+	}
+
 	/*
 	 * Propagate to children as appropriate.  Unlike most other ALTER
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
 	children =
-		find_inheritance_children(RelationGetRelid(rel), lockmode);
+		find_inheritance_children(relid, lockmode);
 
 	if (children)
 	{
@@ -8514,7 +8544,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	/* Add object to delete */
 	object.classId = RelationRelationId;
-	object.objectId = RelationGetRelid(rel);
+	object.objectId = relid;
 	object.objectSubId = attnum;
 	add_exact_object_address(&object, addrs);
 
@@ -15603,6 +15633,11 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	Oid			indexOid;
 	Relation	indexRel;
 	int			key;
+	List	   *pubs;
+	Bitmapset  *indexed_cols = NULL;
+	ListCell   *lc;
+
+	pubs = GetRelationColumnPartialPublications(RelationGetRelid(rel));
 
 	if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
 	{
@@ -15611,11 +15646,16 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	}
 	else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
 	{
+		if (pubs != NIL)
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot set REPLICA IDENTITY FULL when column-partial publications exist"));
 		relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
 		return;
 	}
 	else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
 	{
+		/* XXX not sure what's the right check for publications here */
 		relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
 		return;
 	}
@@ -15626,7 +15666,6 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	else
 		elog(ERROR, "unexpected identity type %u", stmt->identity_type);
 
-
 	/* Check that the index exists */
 	indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace);
 	if (!OidIsValid(indexOid))
@@ -15701,6 +15740,38 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 					 errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
 							RelationGetRelationName(indexRel),
 							NameStr(attr->attname))));
+
+		/*
+		 * Collect columns used, in case we have any publications that we need
+		 * to vet.  System attributes are disallowed so no need to subtract
+		 * FirstLowInvalidHeapAttributeNumber.
+		 */
+		indexed_cols = bms_add_member(indexed_cols, attno);
+	}
+
+	/*
+	 * Check column-partial publications.  All publications have to include all
+	 * key columns of the new index.
+	 */
+	foreach(lc, pubs)
+	{
+		Oid			pubid = lfirst_oid(lc);
+		List	   *published_cols;
+
+		published_cols =
+			GetRelationColumnListInPublication(RelationGetRelid(rel), pubid);
+
+		for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++)
+		{
+			int16	attno = indexRel->rd_index->indkey.values[key];
+
+			if (!list_member_oid(published_cols, attno))
+				ereport(ERROR,
+						errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("index \"%s\" cannot be used because publication \"%s\" does not include all indexed columns",
+							   RelationGetRelationName(indexRel),
+							   get_publication_name(pubid, false)));
+		}
 	}
 
 	/* This index is suitable for use as a replica identity. Mark it. */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..4dad6fedfb 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9742,12 +9742,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->columns = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9762,28 +9763,38 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2 != NULL)
+					{
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->columns = $2;
+						$$->name = NULL;
+					}
+					else
+						$$->name = $1;
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->columns = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->columns = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -17435,8 +17446,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 			{
 				/* convert it to PublicationTable */
 				PublicationTable *pubtable = makeNode(PublicationTable);
-				pubtable->relation = makeRangeVar(NULL, pubobj->name,
-												  pubobj->location);
+
+				pubtable->relation =
+					makeRangeVar(NULL, pubobj->name, pubobj->location);
 				pubobj->pubtable = pubtable;
 				pubobj->name = NULL;
 			}
@@ -17444,6 +17456,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
 		{
+			/*
+			 * This can happen if a column list is specified in a continuation
+			 * for a schema entry; reject it.
+			 */
+			if (pubobj->pubtable)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("column specification not allowed for schemas"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..3428984130 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,11 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel,
+								   Bitmapset *columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+								   HeapTuple tuple, bool binary,
+								   Bitmapset *columns);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +400,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +412,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, columns);
 }
 
 /*
@@ -442,7 +444,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary,
+						Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +466,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, columns);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, columns);
 }
 
 /*
@@ -536,7 +539,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +654,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
+					 Bitmapset *columns)
 {
 	char	   *relname;
 
@@ -673,7 +677,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, columns);
 }
 
 /*
@@ -749,7 +753,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple,
+					   bool binary, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
@@ -761,7 +766,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 
 	for (i = 0; i < desc->natts; i++)
 	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		/* Don't count attributes that are not to be sent. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
 			continue;
 		nliveatts++;
 	}
@@ -783,6 +794,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/* Ignore attributes that are not to be sent. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+
 		if (isnull[i])
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@@ -904,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +929,24 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
-	/* send number of live attributes */
-	for (i = 0; i < desc->natts; i++)
-	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
-			continue;
-		nliveatts++;
-	}
-	pq_sendint16(out, nliveatts);
-
 	/* fetch bitmap of REPLICATION IDENTITY attributes */
 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
 	if (!replidentfull)
 		idattrs = RelationGetIdentityKeyBitmap(rel);
 
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+		nliveatts++;
+	}
+	pq_sendint16(out, nliveatts);
+
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -936,7 +955,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 		if (att->attisdropped || att->attgenerated)
 			continue;
-
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..1303e85851 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -697,17 +698,20 @@ fetch_remote_table_info(char *nspname, char *relname,
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
-	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
+	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
 	bool		isnull;
 	int			natt;
+	ListCell   *lc;
+	bool		am_partition = false;
+	Bitmapset  *included_cols = NULL;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,14 +741,18 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
 
-	/* Now fetch columns. */
+	/*
+	 * Now fetch column names and types.
+	 */
 	resetStringInfo(&cmd);
 	appendStringInfo(&cmd,
-					 "SELECT a.attname,"
+					 "SELECT a.attnum,"
+					 "       a.attname,"
 					 "       a.atttypid,"
 					 "       a.attnum = ANY(i.indkey)"
 					 "  FROM pg_catalog.pg_attribute a"
@@ -772,16 +780,92 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
 	lrel->attkeys = NULL;
 
+	/*
+	 * In server versions 15 and higher, obtain the applicable column filter,
+	 * if any.
+	 */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	{
+		WalRcvExecResult *pubres;
+		TupleTableSlot *slot;
+		Oid			attrsRow[] = {INT2OID};
+		StringInfoData publications;
+		bool		first = true;
+
+		initStringInfo(&publications);
+		foreach(lc, MySubscription->publications)
+		{
+			if (!first)
+				appendStringInfo(&publications, ", ");
+			appendStringInfoString(&publications, quote_literal_cstr(strVal(lfirst(lc))));
+			first = false;
+		}
+
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "  SELECT pg_catalog.unnest(prattrs)\n"
+						 "    FROM pg_catalog.pg_publication p JOIN\n"
+						 "         pg_catalog.pg_publication_rel pr ON (p.oid = pr.prpubid)\n"
+						 "   WHERE p.pubname IN (%s) AND\n",
+						 publications.data);
+		if (!am_partition)
+			appendStringInfo(&cmd, "prrelid = %u", lrel->remoteid);
+		else
+			appendStringInfo(&cmd,
+							 "prrelid IN (SELECT relid\n"
+							 "    FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+							 lrel->remoteid);
+
+		pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+							 lengthof(attrsRow), attrsRow);
+
+		if (pubres->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not fetch attribute info for table \"%s.%s\" from publisher: %s",
+							nspname, relname, pubres->err)));
+
+		slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+		{
+			AttrNumber attnum;
+
+			attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
+			if (isnull)
+				continue;
+			included_cols = bms_add_member(included_cols, attnum);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+		pfree(publications.data);
+		walrcv_clear_result(pubres);
+	}
+
+	/*
+	 * Store the column names only if they are contained in column filter
+	 * LogicalRepRelation will only contain attributes corresponding to those
+	 * specficied in column filters.
+	 */
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
-			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		char	   *rel_colname;
+		AttrNumber	attnum;
+
+		attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+		if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+			continue;
+
+		rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
-		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+		lrel->attnames[natt] = rel_colname;
+		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
 		/* Should never happen. */
@@ -791,12 +875,13 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 		ExecClearTuple(slot);
 	}
+
 	ExecDropSingleTupleTableSlot(slot);
+	walrcv_clear_result(res);
+	pfree(cmd.data);
 
 	lrel->natts = natt;
 
-	walrcv_clear_result(res);
-	pfree(cmd.data);
 }
 
 /*
@@ -829,8 +914,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..34df5d4956 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *columns);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -130,6 +134,13 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+
+	/*
+	 * Set of columns included in the publication, or NULL if all columns are
+	 * included implicitly.  Note that the attnums in this list are not
+	 * shifted by FirstLowInvalidHeapAttributeNumber.
+	 */
+	Bitmapset  *columns;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +581,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +598,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+						Bitmapset *columns)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -610,13 +622,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
 
+		/* Skip if attribute is not present in column filter. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, columns);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->columns);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->columns);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1122,6 +1138,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid			ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1142,6 +1159,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
+		entry->columns = NULL;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
 	}
@@ -1182,6 +1200,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1192,8 +1211,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
-
 				/*
 				 * For a partition, check if any of the ancestors are
 				 * published.  If so, note down the topmost ancestor that is
@@ -1219,6 +1236,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1239,15 +1257,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				Oid			relid;
+				HeapTuple	pub_rel_tuple;
+
+				relid = ancestor_published ? ancestor_id : publish_as_relid;
+				pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+												ObjectIdGetDatum(relid),
+												ObjectIdGetDatum(pub->oid));
+
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					Datum		pub_rel_cols;
+					bool		isnull;
+
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+												   pub_rel_tuple,
+												   Anum_pg_publication_rel_prattrs,
+												   &isnull);
+					if (!isnull)
+					{
+						ArrayType  *arr;
+						int			nelems;
+						int16	   *elems;
+
+						arr = DatumGetArrayTypeP(pub_rel_cols);
+						nelems = ARR_DIMS(arr)[0];
+						elems = (int16 *) ARR_DATA_PTR(arr);
+
+						/* XXX is there a danger of memory leak here? beware */
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						for (int i = 0; i < nelems; i++)
+							entry->columns = bms_add_member(entry->columns,
+															elems[i]);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
-
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1343,6 +1393,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->columns);
+		entry->columns = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index b52f3ccda2..d98b1b50c4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4034,6 +4034,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prattrs;
 	int			i,
 				j,
 				ntups;
@@ -4045,8 +4046,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 	/* Collect all publication membership info. */
 	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+						 "SELECT tableoid, oid, prpubid, prrelid");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, ", prattrs");
+	else
+		appendPQExpBufferStr(query, ", NULL as prattrs");
+	appendPQExpBufferStr(query,
+						 " FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4055,6 +4061,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prattrs = PQfnumber(res, "prattrs");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4096,6 +4103,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
 
+		if (!PQgetisnull(res, i, i_prattrs))
+		{
+			char	  **attnames;
+			int			nattnames;
+			PQExpBuffer attribs;
+
+			if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+							  &attnames, &nattnames))
+				fatal("could not parse %s array", "prattrs");
+			attribs = createPQExpBuffer();
+			for (int k = 0; k < nattnames; k++)
+			{
+				if (k > 0)
+					appendPQExpBufferStr(attribs, ", ");
+
+				appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+			}
+			pubrinfo[i].pubrattrs = attribs->data;
+		}
+		else
+			pubrinfo[j].pubrattrs = NULL;
+
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
 
@@ -4160,10 +4189,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	query = createPQExpBuffer();
 
-	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
-					  fmtQualifiedDumpable(tbinfo));
+	appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrattrs)
+		appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index f011ace8a8..3f7500accc 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrattrs;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c28788e84f..b9d0ebf762 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5815,7 +5815,7 @@ listPublications(const char *pattern)
  */
 static bool
 addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
-						   bool singlecol, printTableContent *cont)
+						   bool as_schema, printTableContent *cont)
 {
 	PGresult   *res;
 	int			count = 0;
@@ -5832,10 +5832,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 
 	for (i = 0; i < count; i++)
 	{
-		if (!singlecol)
+		if (!as_schema)			/* as table */
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
-		else
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+		}
+		else					/* as schema */
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
 		printTableAddFooter(cont, buf->data);
@@ -5963,8 +5967,20 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname, \n");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 "       CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+									 "       pg_catalog.array_to_string"
+									 "(ARRAY(SELECT attname\n"
+									 "         FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
+									 "              pg_catalog.pg_attribute\n"
+									 "        WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+									 "       ELSE NULL END AS columns");
+			else
+				appendPQExpBufferStr(&buf, "NULL as columns");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index cf30239f6d..25c7c08040 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..500991e696 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	List	   *columns;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
@@ -109,6 +110,8 @@ typedef enum PublicationPartOpt
 } PublicationPartOpt;
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetRelationColumnPartialPublications(Oid relid);
+extern List *GetRelationColumnListInPublication(Oid relid, Oid pubid);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 extern List *GetPublicationSchemas(Oid pubid);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..7ad285faae 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	int2vector	prattrs;		/* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..02b547d044 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	List	   *columns;		/* List of columns in a publication table */
 } PublicationTable;
 
 /*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..7a5cb9871d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5ac2d666a2..ae99b99cc6 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,22 @@ Publications:
  regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl5"
+DETAIL:  All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ERROR:  cannot drop column "c" because it is part of publication "testpub_fortable"
+HINT:  Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication.
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 CREATE TABLE testpub_tbl3 (a int);
 CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -669,6 +684,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
 Tables from schemas:
     "pub_test1"
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR:  syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+                                                                ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR:  column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+                                                             ^
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..b422e3e374 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,17 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
 \d+ testpub_tbl2
 \dRp+ testpub_foralltables
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 
 CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +372,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
new file mode 100644
index 0000000000..dfae6d8eac
--- /dev/null
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -0,0 +1,164 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int);
+	INSERT INTO tab2 VALUES (2, 'foo', 2);");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column tab1.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column tab3.b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column tab1.c is not replicated');
+
+# Verify user-defined types
+$node_publisher->safe_psql('postgres',
+	qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+	CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, c int, d text);
+	ALTER PUBLICATION pub1 ADD TABLE test_tab4 (a, b, d);
+	});
+$node_subscriber->safe_psql('postgres',
+	qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+	CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, d text);
+	});
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab4 VALUES (1, 'red', 3, 'oh my');");
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET c = 5 where a = 2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2 WHERE a = 1");
+is($result, qq(1|abc), 'insert on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2 WHERE a = 2");
+is($result, qq(2|foo), 'update on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_tab4");
+is($result, qq(1|red|oh my), 'insert on table with user-defined type');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d)");
+$node_subscriber->safe_psql('postgres',    "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5;"),
+   qq(1|11|1111
+2|22|2222),
+   'overlapping publications with overlapping column lists');

Reply via email to