On 2021-Dec-29, Alvaro Herrera wrote:

> This new stuff is not yet finished.  For example I didn't refactor
> handling of REPLICA IDENTITY, so the new command does not correctly
> check everything, such as the REPLICA IDENTITY FULL stuff.  Also, no
> tests have been added yet.  In manual tests it seems to behave as
> expected.

Fixing the lack of check for replica identity full didn't really require
much refactoring, so I did it that way.

I split it with some trivial fixes that can be committed separately
ahead of time.  I'm thinking in committing 0001 later today, perhaps
0002 tomorrow.  The interesting part is 0003.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
>From 0453eb6397803ce4dd607fd3a17a12d573eb2c90 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 30 Dec 2021 16:48:09 -0300
Subject: [PATCH v13 1/3] Small cleanups for publicationcmds.c and
 pg_publication.c

This fixes a typo in a local function name, completes an existing comment,
and renames an unhappily named local variable.
---
 src/backend/catalog/pg_publication.c   | 11 ++++++-----
 src/backend/commands/publicationcmds.c | 16 +++++++++-------
 src/backend/commands/tablecmds.c       |  3 +--
 src/backend/parser/gram.y              |  5 +++--
 4 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 62f10bcbd2..b307bc2ed5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -287,7 +287,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
 	Oid			relid = RelationGetRelid(targetrel->relation);
-	Oid			prrelid;
+	Oid			pubreloid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
@@ -320,9 +320,9 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
 
-	prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
-								 Anum_pg_publication_rel_oid);
-	values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
+	pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
+								   Anum_pg_publication_rel_oid);
+	values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
 	values[Anum_pg_publication_rel_prpubid - 1] =
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
@@ -334,7 +334,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	CatalogTupleInsert(rel, tup);
 	heap_freetuple(tup);
 
-	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
+	/* Register dependencies as needed */
+	ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
 
 	/* Add dependency on the publication */
 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 404bb5d0c8..3466c57dc0 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -48,7 +48,7 @@
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
-static List *OpenReliIdList(List *relids);
+static List *OpenRelIdList(List *relids);
 static List *OpenTableList(List *tables);
 static void CloseTableList(List *rels);
 static void LockSchemaList(List *schemalist);
@@ -499,8 +499,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 	Oid			pubid = pubform->oid;
 
 	/*
-	 * It is quite possible that for the SET case user has not specified any
-	 * tables in which case we need to remove all the existing tables.
+	 * Nothing to do if no objects, except in SET: for that it is quite
+	 * possible that user has not specified any schemas in which case we need
+	 * to remove all the existing schemas.
 	 */
 	if (!tables && stmt->action != DEFELEM_SET)
 		return;
@@ -593,8 +594,9 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 
 	/*
-	 * It is quite possible that for the SET case user has not specified any
-	 * schemas in which case we need to remove all the existing schemas.
+	 * Nothing to do if no objects, except in SET: for that it is quite
+	 * possible that user has not specified any schemas in which case we need
+	 * to remove all the existing schemas.
 	 */
 	if (!schemaidlist && stmt->action != DEFELEM_SET)
 		return;
@@ -610,7 +612,7 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		List	   *reloids;
 
 		reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
-		rels = OpenReliIdList(reloids);
+		rels = OpenRelIdList(reloids);
 
 		CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
 											  PUBLICATIONOBJ_TABLE_IN_SCHEMA);
@@ -868,7 +870,7 @@ RemovePublicationSchemaById(Oid psoid)
  * add them to a publication.
  */
 static List *
-OpenReliIdList(List *relids)
+OpenRelIdList(List *relids)
 {
 	ListCell   *lc;
 	List	   *rels = NIL;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 45e59e3d5c..3631b8a929 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,8 +40,8 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
-#include "catalog/pg_tablespace.h"
 #include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -15626,7 +15626,6 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	else
 		elog(ERROR, "unexpected identity type %u", stmt->identity_type);
 
-
 	/* Check that the index exists */
 	indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace);
 	if (!OidIsValid(indexOid))
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 2a319eecda..4415ba00fa 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -17433,8 +17433,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 			{
 				/* convert it to PublicationTable */
 				PublicationTable *pubtable = makeNode(PublicationTable);
-				pubtable->relation = makeRangeVar(NULL, pubobj->name,
-												  pubobj->location);
+
+				pubtable->relation =
+					makeRangeVar(NULL, pubobj->name, pubobj->location);
 				pubobj->pubtable = pubtable;
 				pubobj->name = NULL;
 			}
-- 
2.30.2

>From b30bba79a8bf7cef972d45d0e5a3bdd27d555cd0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 30 Dec 2021 16:52:04 -0300
Subject: [PATCH v13 2/3] change use of DEFELEM enum to a new one

---
 src/backend/commands/publicationcmds.c | 18 +++++++++---------
 src/backend/parser/gram.y              |  6 +++---
 src/include/nodes/parsenodes.h         |  9 ++++++++-
 3 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 3466c57dc0..f63132d2ba 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -503,12 +503,12 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 	 * possible that user has not specified any schemas in which case we need
 	 * to remove all the existing schemas.
 	 */
-	if (!tables && stmt->action != DEFELEM_SET)
+	if (!tables && stmt->action != AP_SetObjects)
 		return;
 
 	rels = OpenTableList(tables);
 
-	if (stmt->action == DEFELEM_ADD)
+	if (stmt->action == AP_AddObjects)
 	{
 		List	   *schemas = NIL;
 
@@ -521,9 +521,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 											  PUBLICATIONOBJ_TABLE);
 		PublicationAddTables(pubid, rels, false, stmt);
 	}
-	else if (stmt->action == DEFELEM_DROP)
+	else if (stmt->action == AP_DropObjects)
 		PublicationDropTables(pubid, rels, false);
-	else						/* DEFELEM_SET */
+	else						/* AP_SetObjects */
 	{
 		List	   *oldrelids = GetPublicationRelations(pubid,
 														PUBLICATION_PART_ROOT);
@@ -598,7 +598,7 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 	 * possible that user has not specified any schemas in which case we need
 	 * to remove all the existing schemas.
 	 */
-	if (!schemaidlist && stmt->action != DEFELEM_SET)
+	if (!schemaidlist && stmt->action != AP_SetObjects)
 		return;
 
 	/*
@@ -606,7 +606,7 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 	 * concurrent schema deletion.
 	 */
 	LockSchemaList(schemaidlist);
-	if (stmt->action == DEFELEM_ADD)
+	if (stmt->action == AP_AddObjects)
 	{
 		List	   *rels;
 		List	   *reloids;
@@ -620,9 +620,9 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		CloseTableList(rels);
 		PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
 	}
-	else if (stmt->action == DEFELEM_DROP)
+	else if (stmt->action == AP_DropObjects)
 		PublicationDropSchemas(pubform->oid, schemaidlist, false);
-	else						/* DEFELEM_SET */
+	else if (stmt->action == AP_SetObjects)
 	{
 		List	   *oldschemaids = GetPublicationSchemas(pubform->oid);
 		List	   *delschemas = NIL;
@@ -657,7 +657,7 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
 {
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 
-	if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) &&
+	if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
 		schemaidlist && !superuser())
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4415ba00fa..539fb2d03b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9828,7 +9828,7 @@ AlterPublicationStmt:
 					n->pubname = $3;
 					n->pubobjects = $5;
 					preprocess_pubobj_list(n->pubobjects, yyscanner);
-					n->action = DEFELEM_ADD;
+					n->action = AP_AddObjects;
 					$$ = (Node *)n;
 				}
 			| ALTER PUBLICATION name SET pub_obj_list
@@ -9837,7 +9837,7 @@ AlterPublicationStmt:
 					n->pubname = $3;
 					n->pubobjects = $5;
 					preprocess_pubobj_list(n->pubobjects, yyscanner);
-					n->action = DEFELEM_SET;
+					n->action = AP_SetObjects;
 					$$ = (Node *)n;
 				}
 			| ALTER PUBLICATION name DROP pub_obj_list
@@ -9846,7 +9846,7 @@ AlterPublicationStmt:
 					n->pubname = $3;
 					n->pubobjects = $5;
 					preprocess_pubobj_list(n->pubobjects, yyscanner);
-					n->action = DEFELEM_DROP;
+					n->action = AP_DropObjects;
 					$$ = (Node *)n;
 				}
 		;
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..ced2835d33 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3674,6 +3674,13 @@ typedef struct CreatePublicationStmt
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
+typedef enum AlterPublicationAction
+{
+	AP_AddObjects,				/* add objects to publication */
+	AP_DropObjects,				/* remove objects from publication */
+	AP_SetObjects				/* set list of objects */
+} AlterPublicationAction;
+
 typedef struct AlterPublicationStmt
 {
 	NodeTag		type;
@@ -3688,7 +3695,7 @@ typedef struct AlterPublicationStmt
 	 */
 	List	   *pubobjects;		/* Optional list of publication objects */
 	bool		for_all_tables; /* Special publication for all tables in db */
-	DefElemAction action;		/* What action to perform with the
+	AlterPublicationAction action; /* What action to perform with the
 								 * tables/schemas */
 } AlterPublicationStmt;
 
-- 
2.30.2

>From 756cc058ca0b6102e273729f87efec34fdd69c00 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 6 Sep 2021 10:34:29 -0300
Subject: [PATCH v13 3/3] Add column filtering to logical replication

Add capability to specifiy column names while linking
the table to a publication, at the time of CREATE or ALTER
publication. This will allow replicating only the specified
columns. Other columns, if any, on the subscriber will be populated
locally or NULL will be inserted if no value is supplied for the column
by the upstream during INSERT.
This facilitates replication to a table on subscriber
containing only the subscribed/filtered columns.
If no filter is specified, all the columns are replicated.
REPLICA IDENTITY columns are always replicated.
Thus, prohibit adding relation to publication, if column filters
do not contain REPLICA IDENTITY.
Add a tap test for the same in src/test/subscription.

Author: Rahila Syed <rahilasye...@gmail.com>
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektnrh8nj1jf...@mail.gmail.com
---
 doc/src/sgml/protocol.sgml                   |   4 +-
 doc/src/sgml/ref/alter_publication.sgml      |  20 +-
 doc/src/sgml/ref/create_publication.sgml     |  11 +-
 src/backend/catalog/pg_publication.c         | 306 ++++++++++++++++++-
 src/backend/commands/publicationcmds.c       |  65 +++-
 src/backend/commands/tablecmds.c             |  78 ++++-
 src/backend/nodes/copyfuncs.c                |   1 +
 src/backend/nodes/equalfuncs.c               |   1 +
 src/backend/parser/gram.y                    |  60 +++-
 src/backend/replication/logical/proto.c      |  66 ++--
 src/backend/replication/logical/tablesync.c  | 118 ++++++-
 src/backend/replication/pgoutput/pgoutput.c  |  78 ++++-
 src/bin/pg_dump/pg_dump.c                    |  41 ++-
 src/bin/pg_dump/pg_dump.h                    |   1 +
 src/bin/psql/describe.c                      |  26 +-
 src/bin/psql/tab-complete.c                  |   2 +
 src/include/catalog/pg_publication.h         |   5 +
 src/include/catalog/pg_publication_rel.h     |   3 +
 src/include/nodes/parsenodes.h               |   4 +-
 src/include/replication/logicalproto.h       |   6 +-
 src/test/regress/expected/publication.out    |  33 +-
 src/test/regress/sql/publication.sql         |  20 +-
 src/test/subscription/t/028_column_filter.pl | 164 ++++++++++
 23 files changed, 1031 insertions(+), 82 deletions(-)
 create mode 100644 src/test/subscription/t/028_column_filter.pl

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 34a7034282..5bc2e7a591 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6877,7 +6877,9 @@ Relation
 </listitem>
 </varlistentry>
 </variablelist>
-        Next, the following message part appears for each column (except generated columns):
+        Next, the following message part appears for each column (except
+        generated columns and other columns that don't appear in the column
+        filter list, for tables that have one):
 <variablelist>
 <varlistentry>
 <term>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..16a12b44b9 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -25,12 +25,13 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD <replace
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET <replaceable class="parameter">publication_object</replaceable> [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP <replaceable class="parameter">publication_object</replaceable> [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ALTER TABLE <replaceable class="parameter">publication_object</replaceable> SET COLUMNS { ( <replaceable class="parameter">name</replaceable> [, ...] ) | ALL }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -62,6 +63,11 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
    command retain their previous settings.
   </para>
 
+  <para>
+   The <literal>ALTER TABLE ... SET COLUMNS</literal> variant allows to change
+   the set of columns that are included in the publication.
+  </para>
+
   <para>
    The remaining variants change the owner and the name of the publication.
   </para>
@@ -110,6 +116,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
       name to explicitly indicate that descendant tables are included.
+      Optionally, a column list can be specified.  See <xref
+      linkend="sql-createpublication"/> for details.
      </para>
     </listitem>
    </varlistentry>
@@ -164,9 +172,15 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
   </para>
 
   <para>
-   Add some tables to the publication:
+   Add tables to the publication:
 <programlisting>
-ALTER PUBLICATION mypublication ADD TABLE users, departments;
+ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
+</programlisting></para>
+
+  <para>
+   Change the set of columns published for a table:
+<programlisting>
+ALTER PUBLICATION mypublication ALTER TABLE users SET COLUMNS (user_id, firstname, lastname);
 </programlisting></para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      When a column list is specified, only the listed columns are replicated;
+      any other columns are ignored for the purpose of replication through
+      this publication.  If no column list is specified, all columns of the
+      table are replicated through this publication, including any columns
+      added later.  If a column list is specified, it must include the replica
+      identity columns.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index b307bc2ed5..783ee74c6b 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -45,13 +45,23 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
+
+static void check_publication_columns(Relation targetrel, Bitmapset *columns);
+static AttrNumber *publication_translate_columns(Relation targetrel, List *columns,
+												 int *natts, Bitmapset **attset);
+
 /*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter, or NULL if
+ * no column filter was specified.
  */
 static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
 {
+	bool		replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +92,62 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("This operation is not supported for unlogged tables.")));
+
+	/* Make sure the column list checks out */
+	if (columns != NULL)
+	{
+		/*
+		 * Even if the user listed all columns in the column list, we cannot
+		 * allow a column list to be specified when REPLICA IDENTITY is FULL;
+		 * that would cause problems if a new column is added later, because
+		 * that could would have to be included (because of being part of the
+		 * replica identity) but it's technically not allowed (because of not
+		 * being in the publication's column list yet).  So reject this case
+		 * altogether.
+		 */
+		if (replidentfull)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+		check_publication_columns(targetrel, columns);
+	}
+}
+
+/*
+ * Enforce that the column filter can only leave out columns that aren't
+ * forced to be sent.
+ *
+ * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+ * columns need to be sent regardless); and in other cases, the columns in
+ * the REPLICA IDENTITY cannot be left out.
+ */
+static void
+check_publication_columns(Relation targetrel, Bitmapset *columns)
+{
+	Bitmapset  *idattrs;
+	int			x;
+
+	idattrs = RelationGetIndexAttrBitmap(targetrel,
+										 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+	/*
+	 * We have to test membership the hard way, because the values returned
+	 * by RelationGetIndexAttrBitmap are offset.
+	 */
+	x = -1;
+	while ((x = bms_next_member(idattrs, x)) >= 0)
+	{
+		if (!bms_is_member(x + FirstLowInvalidHeapAttributeNumber, columns))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+	}
+
+	bms_free(idattrs);
 }
 
 /*
@@ -289,6 +355,9 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			pubreloid;
 	Publication *pub = GetPublication(pubid);
+	Bitmapset  *attset = NULL;
+	AttrNumber *attarray;
+	int			natts = 0;
 	ObjectAddress myself,
 				referenced;
 	List	   *relids = NIL;
@@ -314,7 +383,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	/* Translate column names to numbers and verify suitability */
+	attarray = publication_translate_columns(targetrel->relation,
+											 targetrel->columns,
+											 &natts, &attset);
+
+	check_publication_add_relation(targetrel->relation, attset);
+
+	bms_free(attset);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -327,6 +403,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	if (targetrel->columns)
+	{
+		int2vector *prattrs;
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+	}
+	else
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -337,6 +422,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	/* Register dependencies as needed */
 	ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
 
+	/* Add dependency on the columns, if any are listed */
+	for (int i = 0; i < natts; i++)
+	{
+		ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+	}
+	pfree(attarray);
 	/* Add dependency on the publication */
 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
@@ -364,6 +456,144 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	return myself;
 }
 
+/*
+ * Update the column list for a relation in a publication.
+ */
+void
+publication_set_table_columns(Relation pubrel, HeapTuple pubreltup,
+							  Relation targetrel, List *columns)
+{
+	Bitmapset  *attset;
+	AttrNumber *attarray;
+	HeapTuple	copytup;
+	int			natts;
+	bool		nulls[Natts_pg_publication_rel];
+	bool		replaces[Natts_pg_publication_rel];
+	Datum		values[Natts_pg_publication_rel];
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, 0, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_publication_rel_prattrs - 1] = true;
+
+	deleteDependencyRecordsForClass(PublicationRelationId,
+									((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid,
+									RelationRelationId,
+									DEPENDENCY_AUTO);
+
+	if (columns == NULL)
+	{
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
+	}
+	else
+	{
+		ObjectAddress	myself,
+						referenced;
+		int2vector *prattrs;
+
+		if (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot change column set for relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+		attarray = publication_translate_columns(targetrel, columns,
+												 &natts, &attset);
+
+		/*
+		 * Make sure the column list checks out.  XXX this should occur at
+		 * caller in publicationcmds.c, not here.
+		 */
+		check_publication_columns(targetrel, attset);
+		bms_free(attset);
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+
+		/* Add dependencies on the new list of columns */
+		ObjectAddressSet(myself, PublicationRelRelationId,
+						 ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid);
+		for (int i = 0; i < natts; i++)
+		{
+			ObjectAddressSubSet(referenced, RelationRelationId,
+								RelationGetRelid(targetrel), attarray[i]);
+			recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+		}
+	}
+
+	copytup = heap_modify_tuple(pubreltup, RelationGetDescr(pubrel),
+								values, nulls, replaces);
+
+	CatalogTupleUpdate(pubrel, &pubreltup->t_self, copytup);
+
+	heap_freetuple(copytup);
+}
+
+/*
+ * Translate a list of column names to an array of attribute numbers
+ * and a Bitmapset with them; verify that each attribute is appropriate
+ * to have in a publication column list.  Other checks are done later;
+ * see check_publication_columns.
+ *
+ * Note that the attribute numbers are *not* offset by
+ * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
+ * is okay.
+ */
+static AttrNumber *
+publication_translate_columns(Relation targetrel, List *columns, int *natts,
+							  Bitmapset **attset)
+{
+	AttrNumber *attarray;
+	Bitmapset  *set = NULL;
+	ListCell   *lc;
+	int			n = 0;
+
+	/*
+	 * Translate list of columns to attnums. We prohibit system attributes and
+	 * make sure there are no duplicate columns.
+	 *
+	 */
+	attarray = palloc(sizeof(AttrNumber) * list_length(columns));
+	foreach(lc, columns)
+	{
+		char	   *colname = strVal(lfirst(lc));
+		AttrNumber	attnum = get_attnum(RelationGetRelid(targetrel), colname);
+
+		if (attnum == InvalidAttrNumber)
+			ereport(ERROR,
+					errcode(ERRCODE_UNDEFINED_COLUMN),
+					errmsg("column \"%s\" of relation \"%s\" does not exist",
+						   colname, RelationGetRelationName(targetrel)));
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("cannot reference system column \"%s\" in publication column list",
+						   colname));
+
+		if (bms_is_member(attnum, set))
+			ereport(ERROR,
+					errcode(ERRCODE_DUPLICATE_OBJECT),
+					errmsg("duplicate column \"%s\" in publication column list",
+						   colname));
+
+		set = bms_add_member(set, attnum);
+		attarray[n++] = attnum;
+	}
+
+	/*
+	 * XXX qsort the array here, or maybe build just the bitmapset above and
+	 * then scan that in order to produce the array?  Do we care about the
+	 * array being unsorted?
+	 */
+
+	*natts = n;
+	*attset = set;
+	return attarray;
+}
+
 /*
  * Insert new publication / schema mapping.
  */
@@ -471,6 +701,74 @@ GetRelationPublications(Oid relid)
 	return result;
 }
 
+/*
+ * Gets a list of OIDs of all column-partial publications of the given
+ * relation, that is, those that specify a column list.
+ */
+List *
+GetRelationColumnPartialPublications(Oid relid)
+{
+	CatCList   *pubrellist;
+	List	   *pubs = NIL;
+
+	pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
+									 ObjectIdGetDatum(relid));
+	for (int i = 0; i < pubrellist->n_members; i++)
+	{
+		HeapTuple	tup = &pubrellist->members[i]->tuple;
+		bool		isnull;
+
+		(void) SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+							   Anum_pg_publication_rel_prattrs,
+							   &isnull);
+		if (isnull)
+			continue;
+
+		pubs = lappend_oid(pubs,
+						   ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid);
+	}
+
+	ReleaseSysCacheList(pubrellist);
+
+	return pubs;
+}
+
+/*
+ * For a relation in a publication that is known to have a non-null column
+ * list, return the list of attribute numbers that are in it.
+ */
+List *
+GetRelationColumnListInPublication(Oid relid, Oid pubid)
+{
+	HeapTuple	tup;
+	Datum		adatum;
+	bool		isnull;
+	ArrayType  *arr;
+	int			nelems;
+	int16	   *elems;
+	List	   *attnos = NIL;
+
+	tup = SearchSysCache2(PUBLICATIONRELMAP,
+						  ObjectIdGetDatum(relid),
+						  ObjectIdGetDatum(pubid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for rel %u of publication %u", relid, pubid);
+	adatum = SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+							 Anum_pg_publication_rel_prattrs, &isnull);
+	if (isnull)
+		elog(ERROR, "found unexpected null in pg_publication_rel.prattrs");
+	arr = DatumGetArrayTypeP(adatum);
+	nelems = ARR_DIMS(arr)[0];
+	elems = (int16 *) ARR_DATA_PTR(arr);
+
+	for (int i = 0; i < nelems; i++)
+		attnos = lappend_oid(attnos, elems[i]);
+
+	ReleaseSysCache(tup);
+
+	return attnos;
+}
+
 /*
  * Gets list of relation oids for a publication.
  *
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f63132d2ba..aefae8b3c4 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -376,6 +376,46 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	return myself;
 }
 
+/*
+ * Change the column list of a relation in a publication
+ */
+static void
+PublicationSetColumns(AlterPublicationStmt *stmt,
+					  Form_pg_publication pubform, PublicationTable *table)
+{
+	Relation	rel,
+				urel;
+	HeapTuple	tup;
+	ObjectAddress obj,
+				  secondary;
+
+	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
+	urel = table_openrv(table->relation, ShareUpdateExclusiveLock);
+
+	tup = SearchSysCache2(PUBLICATIONRELMAP,
+						  ObjectIdGetDatum(RelationGetRelid(urel)),
+						  ObjectIdGetDatum(pubform->oid));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errmsg("relation \"%s\" is not already in publication \"%s\"",
+					   table->relation->relname,
+					   NameStr(pubform->pubname)));
+
+	publication_set_table_columns(rel, tup, urel, table->columns);
+
+	ObjectAddressSet(obj, PublicationRelationId,
+					 ((Form_pg_publication_rel) GETSTRUCT(tup))->oid);
+	ObjectAddressSet(secondary, RelationRelationId, RelationGetRelid(urel));
+	EventTriggerCollectSimpleCommand(obj, secondary, (Node *) stmt);
+
+	ReleaseSysCache(tup);
+
+	table_close(rel, RowExclusiveLock);
+	table_close(urel, NoLock);
+
+	InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
+}
+
 /*
  * Change options of a publication.
  */
@@ -523,6 +563,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 	}
 	else if (stmt->action == AP_DropObjects)
 		PublicationDropTables(pubid, rels, false);
+	else if (stmt->action == AP_SetColumns)
+	{
+		Assert(schemaidlist == NIL);
+		Assert(list_length(tables) == 1);
+
+		PublicationSetColumns(stmt, pubform,
+							  linitial_node(PublicationTable, tables));
+	}
 	else						/* AP_SetObjects */
 	{
 		List	   *oldrelids = GetPublicationRelations(pubid,
@@ -562,7 +610,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-
+				/* This is not needed to delete a table */
+				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
 		}
@@ -645,6 +694,10 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		 */
 		PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
 	}
+	else
+	{
+		/* Nothing to do for AP_SetColumns */
+	}
 }
 
 /*
@@ -934,6 +987,8 @@ OpenTableList(List *tables)
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->columns = t->columns;
+
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
@@ -967,8 +1022,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
+
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				pub_rel->columns = t->columns;
+
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -1076,6 +1134,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
+		if (pubrel->columns)
+			ereport(ERROR,
+					errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
+
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3631b8a929..a9051eb5e7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8347,6 +8347,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 				 bool missing_ok, LOCKMODE lockmode,
 				 ObjectAddresses *addrs)
 {
+	Oid			relid = RelationGetRelid(rel);
 	HeapTuple	tuple;
 	Form_pg_attribute targetatt;
 	AttrNumber	attnum;
@@ -8366,7 +8367,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 	/*
 	 * get the number of the attribute
 	 */
-	tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+	tuple = SearchSysCacheAttName(relid, colName);
 	if (!HeapTupleIsValid(tuple))
 	{
 		if (!missing_ok)
@@ -8420,13 +8421,42 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	ReleaseSysCache(tuple);
 
+	/*
+	 * Also, if the column is used in the column list of a publication,
+	 * disallow the drop if the DROP is RESTRICT.  We don't do anything if the
+	 * DROP is CASCADE, which means that the dependency mechanism will remove
+	 * the relation from the publication.
+	 */
+	if (behavior == DROP_RESTRICT)
+	{
+		List	   *pubs;
+		ListCell   *lc;
+
+		pubs = GetRelationColumnPartialPublications(relid);
+		foreach(lc, pubs)
+		{
+			Oid		pubid = lfirst_oid(lc);
+			List   *published_cols;
+
+			published_cols =
+				GetRelationColumnListInPublication(relid, pubid);
+
+			if (list_member_oid(published_cols, attnum))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+						errmsg("cannot drop column \"%s\" because it is part of publication \"%s\"",
+							   colName, get_publication_name(pubid, false)),
+						errhint("Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication."));
+		}
+	}
+
 	/*
 	 * Propagate to children as appropriate.  Unlike most other ALTER
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
 	children =
-		find_inheritance_children(RelationGetRelid(rel), lockmode);
+		find_inheritance_children(relid, lockmode);
 
 	if (children)
 	{
@@ -8514,7 +8544,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	/* Add object to delete */
 	object.classId = RelationRelationId;
-	object.objectId = RelationGetRelid(rel);
+	object.objectId = relid;
 	object.objectSubId = attnum;
 	add_exact_object_address(&object, addrs);
 
@@ -15603,6 +15633,11 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	Oid			indexOid;
 	Relation	indexRel;
 	int			key;
+	List	   *pubs;
+	Bitmapset  *indexed_cols = NULL;
+	ListCell   *lc;
+
+	pubs = GetRelationColumnPartialPublications(RelationGetRelid(rel));
 
 	if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
 	{
@@ -15611,11 +15646,16 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	}
 	else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
 	{
+		if (pubs != NIL)
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot set REPLICA IDENTITY FULL when column-partial publications exist"));
 		relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
 		return;
 	}
 	else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
 	{
+		/* XXX not sure what's the right check for publications here */
 		relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
 		return;
 	}
@@ -15700,6 +15740,38 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 					 errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
 							RelationGetRelationName(indexRel),
 							NameStr(attr->attname))));
+
+		/*
+		 * Collect columns used, in case we have any publications that we need
+		 * to vet.  System attributes are disallowed so no need to subtract
+		 * FirstLowInvalidHeapAttributeNumber.
+		 */
+		indexed_cols = bms_add_member(indexed_cols, attno);
+	}
+
+	/*
+	 * Check column-partial publications.  All publications have to include all
+	 * key columns of the new index.
+	 */
+	foreach(lc, pubs)
+	{
+		Oid			pubid = lfirst_oid(lc);
+		List	   *published_cols;
+
+		published_cols =
+			GetRelationColumnListInPublication(RelationGetRelid(rel), pubid);
+
+		for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++)
+		{
+			int16	attno = indexRel->rd_index->indkey.values[key];
+
+			if (!list_member_oid(published_cols, attno))
+				ereport(ERROR,
+						errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("index \"%s\" cannot be used because publication \"%s\" does not include all indexed columns",
+							   RelationGetRelationName(indexRel),
+							   get_publication_name(pubid, false)));
+		}
 	}
 
 	/* This index is suitable for use as a replica identity. Mark it. */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 539fb2d03b..150c23df1b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9740,12 +9740,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->columns = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9760,28 +9761,38 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2 != NULL)
+					{
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->columns = $2;
+						$$->name = NULL;
+					}
+					else
+						$$->name = $1;
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->columns = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->columns = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -9807,6 +9818,9 @@ pub_obj_list: 	PublicationObjSpec
  *
  * ALTER PUBLICATION name SET pub_obj [, ...]
  *
+ * ALTER PUBLICATION name SET COLUMNS table_name (column[, ...])
+ * ALTER PUBLICATION name SET COLUMNS table_name ALL
+ *
  * pub_obj is one of:
  *
  *		TABLE table_name [, ...]
@@ -9840,6 +9854,32 @@ AlterPublicationStmt:
 					n->action = AP_SetObjects;
 					$$ = (Node *)n;
 				}
+			| ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS '(' columnList ')'
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					PublicationObjSpec *obj = makeNode(PublicationObjSpec);
+					obj->pubobjtype = PUBLICATIONOBJ_TABLE;
+					obj->pubtable = makeNode(PublicationTable);
+					obj->pubtable->relation = $6;
+					obj->pubtable->columns = $10;
+					n->pubname = $3;
+					n->pubobjects = list_make1(obj);
+					n->action = AP_SetColumns;
+					$$ = (Node *) n;
+				}
+			| ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS ALL
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					PublicationObjSpec *obj = makeNode(PublicationObjSpec);
+					obj->pubobjtype = PUBLICATIONOBJ_TABLE;
+					obj->pubtable = makeNode(PublicationTable);
+					obj->pubtable->relation = $6;
+					obj->pubtable->columns = NIL;
+					n->pubname = $3;
+					n->pubobjects = list_make1(obj);
+					n->action = AP_SetColumns;
+					$$ = (Node *) n;
+				}
 			| ALTER PUBLICATION name DROP pub_obj_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
@@ -17443,6 +17483,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
 		{
+			/*
+			 * This can happen if a column list is specified in a continuation
+			 * for a schema entry; reject it.
+			 */
+			if (pubobj->pubtable)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("column specification not allowed for schemas"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..3428984130 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,11 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel,
+								   Bitmapset *columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+								   HeapTuple tuple, bool binary,
+								   Bitmapset *columns);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +400,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +412,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, columns);
 }
 
 /*
@@ -442,7 +444,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary,
+						Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +466,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, columns);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, columns);
 }
 
 /*
@@ -536,7 +539,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +654,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
+					 Bitmapset *columns)
 {
 	char	   *relname;
 
@@ -673,7 +677,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, columns);
 }
 
 /*
@@ -749,7 +753,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple,
+					   bool binary, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
@@ -761,7 +766,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 
 	for (i = 0; i < desc->natts; i++)
 	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		/* Don't count attributes that are not to be sent. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
 			continue;
 		nliveatts++;
 	}
@@ -783,6 +794,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/* Ignore attributes that are not to be sent. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+
 		if (isnull[i])
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@@ -904,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +929,24 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
-	/* send number of live attributes */
-	for (i = 0; i < desc->natts; i++)
-	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
-			continue;
-		nliveatts++;
-	}
-	pq_sendint16(out, nliveatts);
-
 	/* fetch bitmap of REPLICATION IDENTITY attributes */
 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
 	if (!replidentfull)
 		idattrs = RelationGetIdentityKeyBitmap(rel);
 
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+		nliveatts++;
+	}
+	pq_sendint16(out, nliveatts);
+
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -936,7 +955,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 		if (att->attisdropped || att->attgenerated)
 			continue;
-
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..1303e85851 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -697,17 +698,20 @@ fetch_remote_table_info(char *nspname, char *relname,
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
-	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
+	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
 	bool		isnull;
 	int			natt;
+	ListCell   *lc;
+	bool		am_partition = false;
+	Bitmapset  *included_cols = NULL;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,14 +741,18 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
 
-	/* Now fetch columns. */
+	/*
+	 * Now fetch column names and types.
+	 */
 	resetStringInfo(&cmd);
 	appendStringInfo(&cmd,
-					 "SELECT a.attname,"
+					 "SELECT a.attnum,"
+					 "       a.attname,"
 					 "       a.atttypid,"
 					 "       a.attnum = ANY(i.indkey)"
 					 "  FROM pg_catalog.pg_attribute a"
@@ -772,16 +780,92 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
 	lrel->attkeys = NULL;
 
+	/*
+	 * In server versions 15 and higher, obtain the applicable column filter,
+	 * if any.
+	 */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	{
+		WalRcvExecResult *pubres;
+		TupleTableSlot *slot;
+		Oid			attrsRow[] = {INT2OID};
+		StringInfoData publications;
+		bool		first = true;
+
+		initStringInfo(&publications);
+		foreach(lc, MySubscription->publications)
+		{
+			if (!first)
+				appendStringInfo(&publications, ", ");
+			appendStringInfoString(&publications, quote_literal_cstr(strVal(lfirst(lc))));
+			first = false;
+		}
+
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "  SELECT pg_catalog.unnest(prattrs)\n"
+						 "    FROM pg_catalog.pg_publication p JOIN\n"
+						 "         pg_catalog.pg_publication_rel pr ON (p.oid = pr.prpubid)\n"
+						 "   WHERE p.pubname IN (%s) AND\n",
+						 publications.data);
+		if (!am_partition)
+			appendStringInfo(&cmd, "prrelid = %u", lrel->remoteid);
+		else
+			appendStringInfo(&cmd,
+							 "prrelid IN (SELECT relid\n"
+							 "    FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+							 lrel->remoteid);
+
+		pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+							 lengthof(attrsRow), attrsRow);
+
+		if (pubres->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not fetch attribute info for table \"%s.%s\" from publisher: %s",
+							nspname, relname, pubres->err)));
+
+		slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+		{
+			AttrNumber attnum;
+
+			attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
+			if (isnull)
+				continue;
+			included_cols = bms_add_member(included_cols, attnum);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+		pfree(publications.data);
+		walrcv_clear_result(pubres);
+	}
+
+	/*
+	 * Store the column names only if they are contained in column filter
+	 * LogicalRepRelation will only contain attributes corresponding to those
+	 * specficied in column filters.
+	 */
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
-			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		char	   *rel_colname;
+		AttrNumber	attnum;
+
+		attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+		if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+			continue;
+
+		rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
-		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+		lrel->attnames[natt] = rel_colname;
+		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
 		/* Should never happen. */
@@ -791,12 +875,13 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 		ExecClearTuple(slot);
 	}
+
 	ExecDropSingleTupleTableSlot(slot);
+	walrcv_clear_result(res);
+	pfree(cmd.data);
 
 	lrel->natts = natt;
 
-	walrcv_clear_result(res);
-	pfree(cmd.data);
 }
 
 /*
@@ -829,8 +914,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..34df5d4956 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *columns);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -130,6 +134,13 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+
+	/*
+	 * Set of columns included in the publication, or NULL if all columns are
+	 * included implicitly.  Note that the attnums in this list are not
+	 * shifted by FirstLowInvalidHeapAttributeNumber.
+	 */
+	Bitmapset  *columns;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +581,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +598,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+						Bitmapset *columns)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -610,13 +622,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
 
+		/* Skip if attribute is not present in column filter. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, columns);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->columns);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->columns);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1122,6 +1138,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid			ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1142,6 +1159,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
+		entry->columns = NULL;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
 	}
@@ -1182,6 +1200,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1192,8 +1211,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
-
 				/*
 				 * For a partition, check if any of the ancestors are
 				 * published.  If so, note down the topmost ancestor that is
@@ -1219,6 +1236,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1239,15 +1257,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				Oid			relid;
+				HeapTuple	pub_rel_tuple;
+
+				relid = ancestor_published ? ancestor_id : publish_as_relid;
+				pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+												ObjectIdGetDatum(relid),
+												ObjectIdGetDatum(pub->oid));
+
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					Datum		pub_rel_cols;
+					bool		isnull;
+
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+												   pub_rel_tuple,
+												   Anum_pg_publication_rel_prattrs,
+												   &isnull);
+					if (!isnull)
+					{
+						ArrayType  *arr;
+						int			nelems;
+						int16	   *elems;
+
+						arr = DatumGetArrayTypeP(pub_rel_cols);
+						nelems = ARR_DIMS(arr)[0];
+						elems = (int16 *) ARR_DATA_PTR(arr);
+
+						/* XXX is there a danger of memory leak here? beware */
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						for (int i = 0; i < nelems; i++)
+							entry->columns = bms_add_member(entry->columns,
+															elems[i]);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
-
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1343,6 +1393,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->columns);
+		entry->columns = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 94f1f32558..1f8b965fd5 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4033,6 +4033,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prattrs;
 	int			i,
 				j,
 				ntups;
@@ -4044,8 +4045,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 	/* Collect all publication membership info. */
 	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+						 "SELECT tableoid, oid, prpubid, prrelid");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, ", prattrs");
+	else
+		appendPQExpBufferStr(query, ", NULL as prattrs");
+	appendPQExpBufferStr(query,
+						 " FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4054,6 +4060,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prattrs = PQfnumber(res, "prattrs");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4095,6 +4102,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
 
+		if (!PQgetisnull(res, i, i_prattrs))
+		{
+			char	  **attnames;
+			int			nattnames;
+			PQExpBuffer attribs;
+
+			if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+							  &attnames, &nattnames))
+				fatal("could not parse %s array", "prattrs");
+			attribs = createPQExpBuffer();
+			for (int k = 0; k < nattnames; k++)
+			{
+				if (k > 0)
+					appendPQExpBufferStr(attribs, ", ");
+
+				appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+			}
+			pubrinfo[i].pubrattrs = attribs->data;
+		}
+		else
+			pubrinfo[j].pubrattrs = NULL;
+
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
 
@@ -4159,10 +4188,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	query = createPQExpBuffer();
 
-	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
-					  fmtQualifiedDumpable(tbinfo));
+	appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrattrs)
+		appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index f011ace8a8..3f7500accc 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrattrs;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c28788e84f..b9d0ebf762 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5815,7 +5815,7 @@ listPublications(const char *pattern)
  */
 static bool
 addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
-						   bool singlecol, printTableContent *cont)
+						   bool as_schema, printTableContent *cont)
 {
 	PGresult   *res;
 	int			count = 0;
@@ -5832,10 +5832,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 
 	for (i = 0; i < count; i++)
 	{
-		if (!singlecol)
+		if (!as_schema)			/* as table */
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
-		else
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+		}
+		else					/* as schema */
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
 		printTableAddFooter(cont, buf->data);
@@ -5963,8 +5967,20 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname, \n");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 "       CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+									 "       pg_catalog.array_to_string"
+									 "(ARRAY(SELECT attname\n"
+									 "         FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
+									 "              pg_catalog.pg_attribute\n"
+									 "        WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+									 "       ELSE NULL END AS columns");
+			else
+				appendPQExpBufferStr(&buf, "NULL as columns");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index cf30239f6d..25c7c08040 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..edd4f0c63c 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	List	   *columns;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
@@ -109,6 +110,8 @@ typedef enum PublicationPartOpt
 } PublicationPartOpt;
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetRelationColumnPartialPublications(Oid relid);
+extern List *GetRelationColumnListInPublication(Oid relid, Oid pubid);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 extern List *GetPublicationSchemas(Oid pubid);
@@ -127,6 +130,8 @@ extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *tar
 											  bool if_not_exists);
 extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
 											bool if_not_exists);
+extern void publication_set_table_columns(Relation pubrel, HeapTuple pubreltup,
+										  Relation targetrel, List *columns);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..7ad285faae 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	int2vector	prattrs;		/* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ced2835d33..91ea815e14 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	List	   *columns;		/* List of columns in a publication table */
 } PublicationTable;
 
 /*
@@ -3678,7 +3679,8 @@ typedef enum AlterPublicationAction
 {
 	AP_AddObjects,				/* add objects to publication */
 	AP_DropObjects,				/* remove objects from publication */
-	AP_SetObjects				/* set list of objects */
+	AP_SetObjects,				/* set list of objects */
+	AP_SetColumns				/* change list of columns for a table */
 } AlterPublicationAction;
 
 typedef struct AlterPublicationStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..7a5cb9871d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5ac2d666a2..60cb242c5a 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,29 @@ Publications:
  regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl5"
+DETAIL:  All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ERROR:  cannot drop column "c" because it is part of publication "testpub_fortable"
+HINT:  Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication.
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a);
+ERROR:  column list must not be specified in ALTER PUBLICATION ... DROP
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+ALTER PUBLICATION testpub_fortable
+  ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c);	-- error
+ERROR:  cannot change column set for relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 CREATE TABLE testpub_tbl3 (a int);
 CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -669,6 +691,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
 Tables from schemas:
     "pub_test1"
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR:  syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+                                                                ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR:  column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+                                                             ^
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..b625d161cb 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,21 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
 \d+ testpub_tbl2
 \dRp+ testpub_foralltables
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a);
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+ALTER PUBLICATION testpub_fortable
+  ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c);	-- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 
 CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +376,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/028_column_filter.pl b/src/test/subscription/t/028_column_filter.pl
new file mode 100644
index 0000000000..dfae6d8eac
--- /dev/null
+++ b/src/test/subscription/t/028_column_filter.pl
@@ -0,0 +1,164 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int);
+	INSERT INTO tab2 VALUES (2, 'foo', 2);");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column tab1.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column tab3.b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column tab1.c is not replicated');
+
+# Verify user-defined types
+$node_publisher->safe_psql('postgres',
+	qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+	CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, c int, d text);
+	ALTER PUBLICATION pub1 ADD TABLE test_tab4 (a, b, d);
+	});
+$node_subscriber->safe_psql('postgres',
+	qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+	CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, d text);
+	});
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab4 VALUES (1, 'red', 3, 'oh my');");
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET c = 5 where a = 2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2 WHERE a = 1");
+is($result, qq(1|abc), 'insert on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2 WHERE a = 2");
+is($result, qq(2|foo), 'update on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_tab4");
+is($result, qq(1|red|oh my), 'insert on table with user-defined type');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d)");
+$node_subscriber->safe_psql('postgres',    "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5;"),
+   qq(1|11|1111
+2|22|2222),
+   'overlapping publications with overlapping column lists');
-- 
2.30.2

Reply via email to