On Fri, Jun 18, 2021, at 8:40 AM, Amit Kapila wrote:
> I have rebased the patch so that you can try it out. The main thing I
> have done is to remove changes in worker.c and created a specialized
> function to create estate for pgoutput.c as I don't think we need what
> is done in worker.c.
> 
> Euler, do let me know if you are not happy with the change in pgoutput.c?
Amit, thanks for rebasing this patch. I already had a similar rebased patch in
my local tree. A recent patch broke your version v15 so I rebased it.

I like the idea of a simple create_estate_for_relation() function (I fixed an
oversight regarding GetCurrentCommandId(false) because it is used only for
read-only purposes). This patch also replaces all references to version 14.

Commit ef948050 made some changes in the snapshot handling. Set the current
active snapshot might not be required but future changes to allow functions
will need it.

As the previous patches, it includes commits (0002 and 0003) that are not
intended to be committed. They are available for test-only purposes.


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 9c27d11efd2ac8257cc2664e5da82fd19f012ebc Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Mon, 18 Jan 2021 12:07:51 -0300
Subject: [PATCH 1/3] Row filter for logical replication

This feature adds row filter for publication tables. When you define or modify
a publication you can optionally filter rows that does not satisfy a WHERE
condition. It allows you to partially replicate a database or set of tables.
The row filter is per table which means that you can define different row
filters for different tables. A new row filter can be added simply by
informing the WHERE clause after the table name. The WHERE expression must be
enclosed by parentheses.

The WHERE clause should probably contain only columns that are part of the
primary key or that are covered by REPLICA IDENTITY. Otherwise, and DELETEs
won't be replicated. DELETE uses the old row version (that is limited to
primary key or REPLICA IDENTITY) to evaluate the row filter. INSERT and UPDATE
use the new row version to evaluate the row filter, hence, you can use any
column. If the row filter evaluates to NULL, it returns false. For simplicity,
functions are not allowed; it could possibly be addressed in a future patch.

If you choose to do the initial table synchronization, only data that satisfies
the row filters is sent. If the subscription has several publications in which
a table has been published with different WHERE clauses, rows must satisfy all
expressions to be copied. If subscriber is a pre-15 version, data
synchronization won't use row filters if they are defined in the publisher.
Previous versions cannot handle row filters.

If your publication contains partitioned table, the parameter
publish_via_partition_root determines if it uses the partition row filter (if
the parameter is false -- default) or the partitioned table row filter.
---
 doc/src/sgml/catalogs.sgml                  |   8 +
 doc/src/sgml/ref/alter_publication.sgml     |  11 +-
 doc/src/sgml/ref/create_publication.sgml    |  32 ++-
 doc/src/sgml/ref/create_subscription.sgml   |  11 +-
 src/backend/catalog/pg_publication.c        |  50 +++-
 src/backend/commands/publicationcmds.c      | 125 ++++++----
 src/backend/parser/gram.y                   |  24 +-
 src/backend/parser/parse_agg.c              |  10 +
 src/backend/parser/parse_expr.c             |  13 ++
 src/backend/parser/parse_func.c             |   3 +
 src/backend/replication/logical/tablesync.c |  94 +++++++-
 src/backend/replication/pgoutput/pgoutput.c | 241 ++++++++++++++++++--
 src/bin/pg_dump/pg_dump.c                   |  24 +-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |  15 +-
 src/include/catalog/pg_publication.h        |   9 +-
 src/include/catalog/pg_publication_rel.h    |   6 +
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/parsenodes.h              |  11 +-
 src/include/parser/parse_node.h             |   1 +
 src/test/regress/expected/publication.out   |  34 +++
 src/test/regress/sql/publication.sql        |  23 ++
 src/test/subscription/t/020_row_filter.pl   | 221 ++++++++++++++++++
 23 files changed, 872 insertions(+), 96 deletions(-)
 create mode 100644 src/test/subscription/t/020_row_filter.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f517a7d4af..dbf2f46c00 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6233,6 +6233,14 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        Reference to relation
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+      <structfield>prqual</structfield> <type>pg_node_tree</type>
+      </para>
+      <para>Expression tree (in <function>nodeToString()</function>
+      representation) for the relation's qualifying condition</para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b2c6..ca091aae33 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -21,8 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
@@ -92,7 +92,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">expression</replaceable> will
+      not be published. Note that parentheses are required around the
+      expression. The <replaceable class="parameter">expression</replaceable>
+      is executed with the role used for the replication connection.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index ff82fbca55..5c2b7d0bd2 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
       | FOR ALL TABLES ]
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -71,6 +71,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       This does not apply to a partitioned table, however.  The partitions of
       a partitioned table are always implicitly considered part of the
       publication, so they are never explicitly added to the publication.
+      If the optional <literal>WHERE</literal> clause is specified, rows that do 
+      not satisfy the <replaceable class="parameter">expression</replaceable> 
+      will not be published. Note that parentheses are required around the 
+      expression. It has no effect on <literal>TRUNCATE</literal> commands.
      </para>
 
      <para>
@@ -131,9 +135,9 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
           on its partitions) contained in the publication will be published
           using the identity and schema of the partitioned table rather than
           that of the individual partitions that are actually changed; the
-          latter is the default.  Enabling this allows the changes to be
-          replicated into a non-partitioned table or a partitioned table
-          consisting of a different set of partitions.
+          latter is the default (<literal>false</literal>).  Enabling this
+          allows the changes to be replicated into a non-partitioned table or a
+          partitioned table consisting of a different set of partitions.
          </para>
 
          <para>
@@ -182,6 +186,14 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
    disallowed on those tables.
   </para>
 
+  <para>
+   The <literal>WHERE</literal> clause should probably contain only columns
+   that are part of the primary key or be covered  by <literal>REPLICA
+   IDENTITY</literal> otherwise, <command>DELETE</command> operations will not
+   be replicated. For <command>INSERT</command> and <command>UPDATE</command>
+   operations, any column can be used in the <literal>WHERE</literal> clause.
+  </para>
+
   <para>
    For an <command>INSERT ... ON CONFLICT</command> command, the publication will
    publish the operation that actually results from the command.  So depending
@@ -197,6 +209,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   <para>
    <acronym>DDL</acronym> operations are not published.
   </para>
+
+  <para>
+   The <literal>WHERE</literal> clause expression is executed with the role used
+   for the replication connection.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -209,6 +226,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
 </programlisting>
   </para>
 
+  <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
   <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e812beee37..7183700ed9 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -102,7 +102,16 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          <para>
           Specifies whether the existing data in the publications that are
           being subscribed to should be copied once the replication starts.
-          The default is <literal>true</literal>.
+          The default is <literal>true</literal>. If any table in the
+          publications has a <literal>WHERE</literal> clause, rows that do not
+          satisfy the <replaceable class="parameter">expression</replaceable>
+          will not be copied. If the subscription has several publications in
+          which a table has been published with different
+          <literal>WHERE</literal> clauses, rows must satisfy all expressions
+          to be copied. If any table in the publications has a
+          <literal>WHERE</literal> clause, data synchronization does not use it
+          if the subscriber is a <productname>PostgreSQL</productname> version
+          before 15.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 86e415af89..78f5780fb7 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -33,6 +33,9 @@
 #include "catalog/pg_type.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -141,18 +144,20 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState *pstate;
+	ParseNamespaceItem *nsitem;
+	Node	   *whereclause;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -161,7 +166,7 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	 * duplicates, it's here just to provide nicer error message in common
 	 * case. The real protection is the unique key on the catalog.
 	 */
-	if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
+	if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(targetrel->relid),
 							  ObjectIdGetDatum(pubid)))
 	{
 		table_close(rel, RowExclusiveLock);
@@ -172,10 +177,27 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
+
+	/* Set up a pstate to parse with */
+	pstate = make_parsestate(NULL);
+	pstate->p_sourcetext = nodeToString(targetrel->whereClause);
+
+	nsitem = addRangeTableEntryForRelation(pstate, targetrel->relation,
+										   AccessShareLock,
+										   NULL, false, false);
+	addNSItemToQuery(pstate, nsitem, false, true, true);
+
+	whereclause = transformWhereClause(pstate,
+									   copyObject(targetrel->whereClause),
+									   EXPR_KIND_PUBLICATION_WHERE,
+									   "PUBLICATION");
+
+	/* Fix up collation information */
+	assign_expr_collations(pstate, whereclause);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -187,7 +209,13 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	values[Anum_pg_publication_rel_prpubid - 1] =
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
-		ObjectIdGetDatum(relid);
+		ObjectIdGetDatum(targetrel->relid);
+
+	/* Add qualifications, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prqual - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -202,14 +230,20 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
 	/* Add dependency on the relation */
-	ObjectAddressSet(referenced, RelationRelationId, relid);
+	ObjectAddressSet(referenced, RelationRelationId, targetrel->relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the qualifications */
+	if (whereclause)
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+
+	free_parsestate(pstate);
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 95c253c8e0..9637d3ddba 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -48,8 +48,8 @@
 /* Same as MAXNUMMESSAGES in sinvaladt.c */
 #define MAX_RELCACHE_INVAL_MSGS 4096
 
-static List *OpenTableList(List *tables);
-static void CloseTableList(List *rels);
+static List *OpenTableList(List *tables, bool is_drop);
+static void CloseTableList(List *rels, bool is_drop);
 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 								 AlterPublicationStmt *stmt);
 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
@@ -232,9 +232,9 @@ CreatePublication(CreatePublicationStmt *stmt)
 
 		Assert(list_length(stmt->tables) > 0);
 
-		rels = OpenTableList(stmt->tables);
+		rels = OpenTableList(stmt->tables, false);
 		PublicationAddTables(puboid, rels, true, NULL);
-		CloseTableList(rels);
+		CloseTableList(rels, false);
 	}
 
 	table_close(rel, RowExclusiveLock);
@@ -372,7 +372,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
-	rels = OpenTableList(stmt->tables);
+	if (stmt->tableAction == DEFELEM_DROP)
+		rels = OpenTableList(stmt->tables, true);
+	else
+		rels = OpenTableList(stmt->tables, false);
 
 	if (stmt->tableAction == DEFELEM_ADD)
 		PublicationAddTables(pubid, rels, false, stmt);
@@ -385,31 +388,24 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 		List	   *delrels = NIL;
 		ListCell   *oldlc;
 
-		/* Calculate which relations to drop. */
+		/*
+		 * Remove all publication-table mappings.  We could possibly remove (i)
+		 * tables that are not found in the new table list and (ii) tables that
+		 * are being re-added with a different qual expression. For (ii),
+		 * simply updating the existing tuple is not enough, because of qual
+		 * expression dependencies.
+		 */
 		foreach(oldlc, oldrelids)
 		{
 			Oid			oldrelid = lfirst_oid(oldlc);
-			ListCell   *newlc;
-			bool		found = false;
+			PublicationRelationInfo *oldrel;
 
-			foreach(newlc, rels)
-			{
-				Relation	newrel = (Relation) lfirst(newlc);
-
-				if (RelationGetRelid(newrel) == oldrelid)
-				{
-					found = true;
-					break;
-				}
-			}
-
-			if (!found)
-			{
-				Relation	oldrel = table_open(oldrelid,
-												ShareUpdateExclusiveLock);
-
-				delrels = lappend(delrels, oldrel);
-			}
+			oldrel = palloc(sizeof(PublicationRelationInfo));
+			oldrel->relid = oldrelid;
+			oldrel->whereClause = NULL;
+			oldrel->relation = table_open(oldrel->relid,
+										  ShareUpdateExclusiveLock);
+			delrels = lappend(delrels, oldrel);
 		}
 
 		/* And drop them. */
@@ -421,10 +417,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 		 */
 		PublicationAddTables(pubid, rels, true, stmt);
 
-		CloseTableList(delrels);
+		CloseTableList(delrels, false);
 	}
 
-	CloseTableList(rels);
+	CloseTableList(rels, false);
 }
 
 /*
@@ -500,26 +496,42 @@ RemovePublicationRelById(Oid proid)
 
 /*
  * Open relations specified by a RangeVar list.
+ * AlterPublicationStmt->tables has a different list element, hence, is_drop
+ * indicates if it has a RangeVar (true) or PublicationTable (false).
  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
  * add them to a publication.
  */
 static List *
-OpenTableList(List *tables)
+OpenTableList(List *tables, bool is_drop)
 {
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	PublicationRelationInfo *pri;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
-		bool		recurse = rv->inh;
+		PublicationTable *t = NULL;
+		RangeVar   *rv;
+		bool		recurse;
 		Relation	rel;
 		Oid			myrelid;
 
+		if (is_drop)
+		{
+			rv = castNode(RangeVar, lfirst(lc));
+		}
+		else
+		{
+			t = lfirst(lc);
+			rv = castNode(RangeVar, t->relation);
+		}
+
+		recurse = rv->inh;
+
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
 
@@ -538,8 +550,12 @@ OpenTableList(List *tables)
 			table_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
-
-		rels = lappend(rels, rel);
+		pri = palloc(sizeof(PublicationRelationInfo));
+		pri->relid = myrelid;
+		pri->relation = rel;
+		if (!is_drop)
+			pri->whereClause = t->whereClause;
+		rels = lappend(rels, pri);
 		relids = lappend_oid(relids, myrelid);
 
 		/*
@@ -572,7 +588,13 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				pri = palloc(sizeof(PublicationRelationInfo));
+				pri->relid = childrelid;
+				pri->relation = rel;
+				/* child inherits WHERE clause from parent */
+				if (!is_drop)
+					pri->whereClause = t->whereClause;
+				rels = lappend(rels, pri);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -587,16 +609,28 @@ OpenTableList(List *tables)
  * Close all relations in the list.
  */
 static void
-CloseTableList(List *rels)
+CloseTableList(List *rels, bool is_drop)
 {
 	ListCell   *lc;
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		if (is_drop)
+		{
+			Relation    rel = (Relation) lfirst(lc);
 
-		table_close(rel, NoLock);
+			table_close(rel, NoLock);
+		}
+		else
+		{
+			PublicationRelationInfo *pri = (PublicationRelationInfo *) lfirst(lc);
+
+			table_close(pri->relation, NoLock);
+		}
 	}
+
+	if (!is_drop)
+		list_free_deep(rels);
 }
 
 /*
@@ -612,15 +646,15 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationInfo *pri = (PublicationRelationInfo *) lfirst(lc);
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
-		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
-			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
-						   RelationGetRelationName(rel));
+		if (!pg_class_ownercheck(pri->relid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(pri->relation->rd_rel->relkind),
+						   RelationGetRelationName(pri->relation));
 
-		obj = publication_add_relation(pubid, rel, if_not_exists);
+		obj = publication_add_relation(pubid, pri, if_not_exists);
 		if (stmt)
 		{
 			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
@@ -644,11 +678,10 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
-		Oid			relid = RelationGetRelid(rel);
+		PublicationRelationInfo *pri = (PublicationRelationInfo *) lfirst(lc);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
-							   ObjectIdGetDatum(relid),
+							   ObjectIdGetDatum(pri->relid),
 							   ObjectIdGetDatum(pubid));
 		if (!OidIsValid(prid))
 		{
@@ -658,7 +691,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
+							RelationGetRelationName(pri->relation))));
 		}
 
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eb24195438..d82ea003db 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
 				vacuum_relation_list opt_vacuum_relation_list
-				drop_option_list
+				drop_option_list publication_table_list
 
 %type <node>	opt_routine_body
 %type <groupclause> group_clause
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table_elem
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -9612,7 +9612,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9643,7 +9643,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9651,7 +9651,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9669,6 +9669,20 @@ AlterPublicationStmt:
 				}
 		;
 
+publication_table_list:
+			publication_table_elem									{ $$ = list_make1($1); }
+			| publication_table_list ',' publication_table_elem		{ $$ = lappend($1, $3); }
+		;
+
+publication_table_elem: relation_expr OptWhereClause
+				{
+					PublicationTable *n = makeNode(PublicationTable);
+					n->relation = $1;
+					n->whereClause = $2;
+					$$ = (Node *) n;
+				}
+		;
+
 /*****************************************************************************
  *
  * CREATE SUBSCRIPTION name ...
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index a25f8d5b98..5619ac6904 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -551,6 +551,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in COPY FROM WHERE conditions");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE expressions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE expressions");
+
+			break;
 
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
@@ -950,6 +957,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index f928c32311..fc4170e723 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -119,6 +119,13 @@ transformExprRecurse(ParseState *pstate, Node *expr)
 	/* Guard against stack overflow due to overly complex expressions */
 	check_stack_depth();
 
+	/* Functions are not allowed in publication WHERE clauses */
+	if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && nodeTag(expr) == T_FuncCall)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("functions are not allowed in publication WHERE expressions"),
+				 parser_errposition(pstate, exprLocation(expr))));
+
 	switch (nodeTag(expr))
 	{
 		case T_ColumnRef:
@@ -509,6 +516,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref)
 		case EXPR_KIND_COPY_WHERE:
 		case EXPR_KIND_GENERATED_COLUMN:
 		case EXPR_KIND_CYCLE_MARK:
+		case EXPR_KIND_PUBLICATION_WHERE:
 			/* okay */
 			break;
 
@@ -1769,6 +1777,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("cannot use subquery in column generation expression");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE expression");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -3089,6 +3100,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "GENERATED AS";
 		case EXPR_KIND_CYCLE_MARK:
 			return "CYCLE";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication expression";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 3cec8de7da..e946f17c64 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2655,6 +2655,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 682c107e74..980826a502 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -691,19 +691,23 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 /*
  * Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in COPY command.
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel)
+						LogicalRepRelation *lrel, List **qual)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			qualRow[] = {TEXTOID};
 	bool		isnull;
 	int			natt;
+	ListCell   *lc;
+	bool		first;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -799,6 +803,55 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->natts = natt;
 
 	walrcv_clear_result(res);
+
+	/* Get relation qual */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	{
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT pg_get_expr(prqual, prrelid) "
+						 "  FROM pg_publication p "
+						 "  INNER JOIN pg_publication_rel pr "
+						 "       ON (p.oid = pr.prpubid) "
+						 " WHERE pr.prrelid = %u "
+						 "   AND p.pubname IN (", lrel->remoteid);
+
+		first = true;
+		foreach(lc, MySubscription->publications)
+		{
+			char	   *pubname = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+
+			appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+		}
+		appendStringInfoChar(&cmd, ')');
+
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s",
+							nspname, relname, res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			Datum		rf = slot_getattr(slot, 1, &isnull);
+
+			if (!isnull)
+				*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+
+			ExecClearTuple(slot);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(res);
+	}
+
 	pfree(cmd.data);
 }
 
@@ -812,6 +865,7 @@ copy_table(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
+	List	   *qual = NIL;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyFromState cstate;
@@ -820,7 +874,7 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel);
+							RelationGetRelationName(rel), &lrel, &qual);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -829,16 +883,23 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	/* List of columns for COPY */
+	attnamelist = make_copy_attnamelist(relmapentry);
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
-	if (lrel.relkind == RELKIND_RELATION)
+
+	/* Regular table with no row filter */
+	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	else
 	{
 		/*
 		 * For non-tables, we need to do COPY (SELECT ...), but we can't just
-		 * do SELECT * because we need to not copy generated columns.
+		 * do SELECT * because we need to not copy generated columns. For
+		 * tables with any row filters, build a SELECT query with AND'ed row
+		 * filters for COPY.
 		 */
 		appendStringInfoString(&cmd, "COPY (SELECT ");
 		for (int i = 0; i < lrel.natts; i++)
@@ -847,8 +908,29 @@ copy_table(Relation rel)
 			if (i < lrel.natts - 1)
 				appendStringInfoString(&cmd, ", ");
 		}
-		appendStringInfo(&cmd, " FROM %s) TO STDOUT",
+		appendStringInfo(&cmd, " FROM %s",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		/* list of AND'ed filters */
+		if (qual != NIL)
+		{
+			ListCell   *lc;
+			bool		first = true;
+
+			appendStringInfoString(&cmd, " WHERE ");
+			foreach(lc, qual)
+			{
+				char	   *q = strVal(lfirst(lc));
+
+				if (first)
+					first = false;
+				else
+					appendStringInfoString(&cmd, " AND ");
+				appendStringInfo(&cmd, "%s", q);
+			}
+			list_free_deep(qual);
+		}
+
+		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index abd5217ab1..10f85365fc 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,18 +13,26 @@
 #include "postgres.h"
 
 #include "access/tupconvert.h"
+#include "access/xact.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
 #include "commands/defrem.h"
+#include "executor/executor.h"
 #include "fmgr.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parse_coerce.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -61,6 +69,11 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn,
 								   XLogRecPtr commit_lsn);
+static EState *create_estate_for_relation(Relation rel);
+static ExprState *pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate);
+static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext);
+static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple,
+								HeapTuple newtuple, List *rowfilter);
 
 static bool publications_valid;
 static bool in_streaming;
@@ -99,6 +112,7 @@ typedef struct RelationSyncEntry
 
 	bool		replicate_valid;
 	PublicationActions pubactions;
+	List	   *qual;
 
 	/*
 	 * OID of the relation to publish changes as.  For a partition, this may
@@ -122,7 +136,7 @@ static HTAB *RelationSyncCache = NULL;
 
 static void init_rel_sync_cache(MemoryContext decoding_context);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
@@ -520,6 +534,148 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	OutputPluginWrite(ctx, false);
 }
 
+static EState *
+create_estate_for_relation(Relation rel)
+{
+	EState			*estate;
+	RangeTblEntry	*rte;
+
+	estate = CreateExecutorState();
+
+	rte = makeNode(RangeTblEntry);
+	rte->rtekind = RTE_RELATION;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
+	rte->rellockmode = AccessShareLock;
+	ExecInitRangeTable(estate, list_make1(rte));
+
+	estate->es_output_cid = GetCurrentCommandId(false);
+
+	return estate;
+}
+
+static ExprState *
+pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate)
+{
+	ExprState  *exprstate;
+	Oid			exprtype;
+	Expr	   *expr;
+
+	/* Prepare expression for execution */
+	exprtype = exprType(rfnode);
+	expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+
+	if (expr == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CANNOT_COERCE),
+				 errmsg("row filter returns type %s that cannot be coerced to the expected type %s",
+						format_type_be(exprtype),
+						format_type_be(BOOLOID)),
+				 errhint("You will need to rewrite the row filter.")));
+
+	exprstate = ExecPrepareExpr(expr, estate);
+
+	return exprstate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+	Datum		ret;
+	bool		isnull;
+
+	Assert(state != NULL);
+
+	ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+	elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+		 DatumGetBool(ret) ? "true" : "false",
+		 isnull ? "true" : "false");
+
+	if (isnull)
+		return false;
+
+	return DatumGetBool(ret);
+}
+
+/*
+ * Change is checked against the row filter, if any.
+ *
+ * If it returns true, the change is replicated, otherwise, it is not.
+ */
+static bool
+pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, List *rowfilter)
+{
+	TupleDesc	tupdesc;
+	EState	   *estate;
+	ExprContext *ecxt;
+	MemoryContext oldcxt;
+	ListCell   *lc;
+	bool		result = true;
+
+	/* Bail out if there is no row filter */
+	if (rowfilter == NIL)
+		return true;
+
+	elog(DEBUG3, "table \"%s.%s\" has row filter",
+		 get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
+		 get_rel_name(relation->rd_id));
+
+	tupdesc = RelationGetDescr(relation);
+
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+	estate = create_estate_for_relation(relation);
+
+	/* Prepare context per tuple */
+	ecxt = GetPerTupleExprContext(estate);
+	oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
+	ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsHeapTuple);
+	MemoryContextSwitchTo(oldcxt);
+
+	ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false);
+
+	/*
+	 * If the subscription has multiple publications and the same table has a
+	 * different row filter in these publications, all row filters must be
+	 * matched in order to replicate this change.
+	 */
+	foreach(lc, rowfilter)
+	{
+		Node	   *rfnode = (Node *) lfirst(lc);
+		ExprState  *exprstate;
+
+		/* Prepare for expression execution */
+		exprstate = pgoutput_row_filter_prepare_expr(rfnode, estate);
+
+		/* Evaluates row filter */
+		result = pgoutput_row_filter_exec_expr(exprstate, ecxt);
+
+		elog(DEBUG3, "row filter \"%s\" %smatched",
+			 TextDatumGetCString(DirectFunctionCall2(pg_get_expr,
+													 CStringGetTextDatum(nodeToString(rfnode)),
+													 ObjectIdGetDatum(relation->rd_id))),
+			 result ? "" : "not ");
+
+		/* If the tuple does not match one of the row filters, bail out */
+		if (!result)
+			break;
+	}
+
+	/* Cleanup allocated resources */
+	ResetExprContext(ecxt);
+	FreeExecutorState(estate);
+	PopActiveSnapshot();
+
+	return result;
+}
+
 /*
  * Sends the decoded DML over wire.
  *
@@ -547,7 +703,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = change->txn->xid;
 
-	relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+	relentry = get_rel_sync_entry(data, relation);
 
 	/* First check the table filter */
 	switch (change->action)
@@ -571,8 +727,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	maybe_send_schema(ctx, txn, change, relation, relentry);
-
 	/* Send the data */
 	switch (change->action)
 	{
@@ -580,6 +734,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(relation, NULL, tuple, relentry->qual))
+					return;
+
+				/*
+				 * Schema should be sent before the logic that replaces the
+				 * relation because it also sends the ancestor's relation.
+				 */
+				maybe_send_schema(ctx, txn, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -603,6 +767,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				&change->data.tp.oldtuple->tuple : NULL;
 				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry->qual))
+					return;
+
+				maybe_send_schema(ctx, txn, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -631,6 +801,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry->qual))
+					return;
+
+				maybe_send_schema(ctx, txn, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -689,12 +865,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	for (i = 0; i < nrelations; i++)
 	{
 		Relation	relation = relations[i];
-		Oid			relid = RelationGetRelid(relation);
 
 		if (!is_publishable_relation(relation))
 			continue;
 
-		relentry = get_rel_sync_entry(data, relid);
+		relentry = get_rel_sync_entry(data, relation);
 
 		if (!relentry->pubactions.pubtruncate)
 			continue;
@@ -704,10 +879,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		 * root tables through it.
 		 */
 		if (relation->rd_rel->relispartition &&
-			relentry->publish_as_relid != relid)
+			relentry->publish_as_relid != relentry->relid)
 			continue;
 
-		relids[nrelids++] = relid;
+		relids[nrelids++] = relentry->relid;
 		maybe_send_schema(ctx, txn, change, relation, relentry);
 	}
 
@@ -1005,16 +1180,21 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
  * when publishing.
  */
 static RelationSyncEntry *
-get_rel_sync_entry(PGOutputData *data, Oid relid)
+get_rel_sync_entry(PGOutputData *data, Relation rel)
 {
 	RelationSyncEntry *entry;
-	bool		am_partition = get_rel_relispartition(relid);
-	char		relkind = get_rel_relkind(relid);
+	Oid			relid;
+	bool		am_partition;
+	char		relkind;
 	bool		found;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
 
+	relid = RelationGetRelid(rel);
+	am_partition = get_rel_relispartition(relid);
+	relkind = get_rel_relkind(relid);
+
 	/* Find cached relation info, creating if not found */
 	entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
 											  (void *) &relid,
@@ -1030,6 +1210,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->replicate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->qual = NIL;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1063,6 +1244,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			HeapTuple	rftuple;
+			Datum		rfdatum;
+			bool		rfisnull;
 
 			if (pub->alltables)
 			{
@@ -1122,9 +1306,29 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
+			/*
+			 * Cache row filter, if available. All publication-table mappings
+			 * must be checked. If it is a partition and pubviaroot is true,
+			 * use the row filter of the topmost partitioned table instead of
+			 * the row filter of its own partition.
+			 */
+			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rftuple))
+			{
+				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull);
+
+				if (!rfisnull)
+				{
+					Node	   *rfnode;
+
+					oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					rfnode = stringToNode(TextDatumGetCString(rfdatum));
+					entry->qual = lappend(entry->qual, rfnode);
+					MemoryContextSwitchTo(oldctx);
+				}
+
+				ReleaseSysCache(rftuple);
+			}
 		}
 
 		list_free(pubids);
@@ -1242,6 +1446,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	HASH_SEQ_STATUS status;
 	RelationSyncEntry *entry;
+	MemoryContext oldctx;
 
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
@@ -1251,6 +1456,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	if (RelationSyncCache == NULL)
 		return;
 
+	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
 	/*
 	 * There is no way to find which entry in our cache the hash belongs to so
 	 * mark the whole cache as invalid.
@@ -1268,5 +1475,11 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+
+		if (entry->qual != NIL)
+			list_free_deep(entry->qual);
+		entry->qual = NIL;
 	}
+
+	MemoryContextSwitchTo(oldctx);
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 321152151d..6f944ec60d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4172,6 +4172,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prrelqual;
 	int			i,
 				j,
 				ntups;
@@ -4182,9 +4183,16 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	query = createPQExpBuffer();
 
 	/* Collect all publication membership info. */
-	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query,
+							 "SELECT tableoid, oid, prpubid, prrelid, "
+							 "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual "
+							 "FROM pg_catalog.pg_publication_rel");
+	else
+		appendPQExpBufferStr(query,
+							 "SELECT tableoid, oid, prpubid, prrelid, "
+							 "NULL AS prrelqual "
+							 "FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4193,6 +4201,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prrelqual = PQfnumber(res, "prrelqual");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4233,6 +4242,10 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].dobj.name = tbinfo->dobj.name;
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
+		if (PQgetisnull(res, i, i_prrelqual))
+			pubrinfo[j].pubrelqual = NULL;
+		else
+			pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
 
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
@@ -4265,8 +4278,11 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
+	appendPQExpBuffer(query, " %s",
 					  fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrelqual)
+		appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ba9bc6ddd2..7d72d498c1 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -626,6 +626,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrelqual;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 2abf255798..e2e64cb3bf 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6329,8 +6329,15 @@ describePublications(const char *pattern)
 		if (!puballtables)
 		{
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname");
+			if (pset.sversion >= 150000)
+				appendPQExpBuffer(&buf,
+								  ", pg_get_expr(pr.prqual, c.oid)");
+			else
+				appendPQExpBuffer(&buf,
+								  ", NULL");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
@@ -6359,6 +6366,10 @@ describePublications(const char *pattern)
 								  PQgetvalue(tabres, j, 0),
 								  PQgetvalue(tabres, j, 1));
 
+				if (!PQgetisnull(tabres, j, 2))
+					appendPQExpBuffer(&buf, " WHERE (%s)",
+									  PQgetvalue(tabres, j, 2));
+
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(tabres);
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index f332bad4d4..333c2b581d 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -83,6 +83,13 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationInfo
+{
+	Oid			relid;
+	Relation	relation;
+	Node	   *whereClause;
+} PublicationRelationInfo;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -108,7 +115,7 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
 											  bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..154bb61777 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN			/* variable-length fields start here */
+	pg_node_tree prqual;		/* qualifications */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
  */
 typedef FormData_pg_publication_rel *Form_pg_publication_rel;
 
+DECLARE_TOAST(pg_publication_rel, 8287, 8288);
+
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index d9e417bcd7..2037705f45 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -491,6 +491,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index def9651b34..cf815cc0f2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3624,12 +3624,19 @@ typedef struct AlterTSConfigurationStmt
 } AlterTSConfigurationStmt;
 
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar   *relation;		/* relation to be published */
+	Node	   *whereClause;	/* qualifications */
+} PublicationTable;
+
 typedef struct CreatePublicationStmt
 {
 	NodeTag		type;
 	char	   *pubname;		/* Name of the publication */
 	List	   *options;		/* List of DefElem nodes */
-	List	   *tables;			/* Optional list of tables to add */
+	List	   *tables;			/* Optional list of PublicationTable to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
@@ -3642,7 +3649,7 @@ typedef struct AlterPublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
-	List	   *tables;			/* List of tables to add/drop */
+	List	   *tables;			/* List of PublicationTable to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
 } AlterPublicationStmt;
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 1500de2dd0..4537543a7b 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -80,6 +80,7 @@ typedef enum ParseExprKind
 	EXPR_KIND_COPY_WHERE,		/* WHERE condition in COPY FROM */
 	EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */
 	EXPR_KIND_CYCLE_MARK,		/* cycle mark value */
+	EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 63d6ab7a4e..96d869dd27 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -156,6 +156,40 @@ Tables:
 
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+ERROR:  functions are not allowed in publication WHERE expressions
+LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ...
+                                                             ^
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27);
+ERROR:  syntax error at or near "WHERE"
+LINE 1: ...R PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e <...
+                                                             ^
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl3" WHERE (((e > 300) AND (e < 500)))
+
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP PUBLICATION testpub5;
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 ERROR:  "testpub_view" is not a table
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index d844075368..35211c56f6 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -93,6 +93,29 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27);
+\dRp+ testpub5
+
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP PUBLICATION testpub5;
+
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 SET client_min_messages = 'ERROR';
diff --git a/src/test/subscription/t/020_row_filter.pl b/src/test/subscription/t/020_row_filter.pl
new file mode 100644
index 0000000000..35a41741d3
--- /dev/null
+++ b/src/test/subscription/t/020_row_filter.pl
@@ -0,0 +1,221 @@
+# Test logical replication behavior with row filtering
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+
+# create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
+);
+
+# setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
+);
+
+# setup logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_rowfilter_partitioned WHERE (a < 5000)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)"
+);
+
+# test row filtering
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x"
+);
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"
+);
+# use partition row filter:
+# - replicate (1, 100) because 1 < 6000 is true
+# - don't replicate (8000, 101) because 8000 < 6000 is false
+# - replicate (15000, 102) because partition tab_rowfilter_greater_10k doesn't have row filter
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(8000, 101),(15000, 102)"
+);
+# insert directly into partition
+# use partition row filter: replicate (2, 200) because 2 < 6000 is true
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(2, 200)");
+# use partition row filter: replicate (5500, 300) because 5500 < 6000 is true
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(5500, 300)");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
+is( $result, qq(1001|test 1001
+1002|test 1002
+1980|not filtered), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(3|6|18), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(a) FROM tab_rowfilter_3");
+is($result, qq(10), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_less_10k ORDER BY 1, 2");
+is($result, qq(1|100
+2|200
+5500|300), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_greater_10k ORDER BY 1, 2");
+is($result, qq(15000|102), 'check filtered data was copied to subscriber');
+
+# test row filter (INSERT, UPDATE, DELETE)
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1600, 'test 1600')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1601, 'test 1601')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1700, 'test 1700')");
+# UPDATE is not replicated ; row filter evaluates to false when b = NULL
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_1 SET b = NULL WHERE a = 1600");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601");
+# DELETE is not replicated ; b is not part of the PK or replica identity and
+# old tuple contains b = NULL, hence, row filter evaluates to false
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab_rowfilter_1 WHERE a = 1700");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
+is($result, qq(1001|test 1001
+1002|test 1002
+1600|test 1600
+1601|test 1601 updated
+1700|test 1700
+1980|not filtered), 'check filtered data was copied to subscriber');
+
+# publish using partitioned table
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_3 SET (publish_via_partition_root = true)");
+$node_subscriber->safe_psql('postgres',
+	"TRUNCATE TABLE tab_rowfilter_partitioned");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
+# use partitioned table row filter: replicate, 4000 < 5000 is true
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400)");
+# use partitioned table row filter: replicate, 4500 < 5000 is true
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(4500, 450)");
+# use partitioned table row filter: don't replicate, 5600 < 5000 is false
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(5600, 123)");
+# use partitioned table row filter: don't replicate, 16000 < 5000 is false
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 1950)");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_partitioned ORDER BY 1, 2");
+is( $result, qq(1|100
+2|200
+4000|400
+4500|450), 'check publish_via_partition_root behavior');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.20.1

From 0ff56520b40129c1fc1fbb1379214a1b94413a9f Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Sun, 31 Jan 2021 20:48:43 -0300
Subject: [PATCH 2/3] Measure row filter overhead

---
 src/backend/replication/pgoutput/pgoutput.c | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 10f85365fc..4677d21b62 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -618,6 +618,8 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L
 	MemoryContext oldcxt;
 	ListCell   *lc;
 	bool		result = true;
+	instr_time	start_time;
+	instr_time	end_time;
 
 	/* Bail out if there is no row filter */
 	if (rowfilter == NIL)
@@ -627,6 +629,8 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L
 		 get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
 		 get_rel_name(relation->rd_id));
 
+	INSTR_TIME_SET_CURRENT(start_time);
+
 	tupdesc = RelationGetDescr(relation);
 
 	PushActiveSnapshot(GetTransactionSnapshot());
@@ -673,6 +677,11 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L
 	FreeExecutorState(estate);
 	PopActiveSnapshot();
 
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+
+	elog(DEBUG2, "row filter time: %0.3f us", INSTR_TIME_GET_DOUBLE(end_time) * 1e6);
+
 	return result;
 }
 
-- 
2.20.1

From 4275d48c9f59b6f3b2bc99b4563119da3909dd56 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Fri, 26 Feb 2021 21:08:10 -0300
Subject: [PATCH 3/3] Debug messages

---
 src/backend/parser/parse_expr.c           | 6 ++++++
 src/test/subscription/t/020_row_filter.pl | 1 +
 2 files changed, 7 insertions(+)

diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index fc4170e723..bd02576c86 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -116,6 +116,12 @@ transformExprRecurse(ParseState *pstate, Node *expr)
 	if (expr == NULL)
 		return NULL;
 
+	/*
+	 * T_FuncCall: 349
+	 * EXPR_KIND_PUBLICATION_WHERE: 42
+	 */
+	elog(DEBUG3, "nodeTag(expr): %d ; pstate->p_expr_kind: %d ; nodeToString(expr): %s", nodeTag(expr), pstate->p_expr_kind, nodeToString(expr));
+
 	/* Guard against stack overflow due to overly complex expressions */
 	check_stack_depth();
 
diff --git a/src/test/subscription/t/020_row_filter.pl b/src/test/subscription/t/020_row_filter.pl
index 35a41741d3..8c305cf1dc 100644
--- a/src/test/subscription/t/020_row_filter.pl
+++ b/src/test/subscription/t/020_row_filter.pl
@@ -8,6 +8,7 @@ use Test::More tests => 7;
 # create publisher node
 my $node_publisher = get_new_node('publisher');
 $node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'log_min_messages = DEBUG3');
 $node_publisher->start;
 
 # create subscriber node
-- 
2.20.1

Reply via email to