Hi,

The attached patches add support for filtering rows in the publisher.
The output plugin will do the work if a filter was defined in CREATE
PUBLICATION command. An optional WHERE clause can be added after the
table name in the CREATE PUBLICATION such as:

CREATE PUBLICATION foo FOR TABLE departments WHERE (id > 2000 AND id <= 3000);

Row that doesn't match the WHERE clause will not be sent to the subscribers.

Patches 0001 and 0002 are only refactors and can be applied
independently. 0003 doesn't include row filtering on initial
synchronization.

Comments?


-- 
   Euler Taveira                                   Timbira -
http://www.timbira.com.br/
   PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
From ae95eb51dd72c2e8ba278da950b478f6c6741fc0 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 02:21:03 +0000
Subject: [PATCH 1/3] Refactor function create_estate_for_relation

Relation localrel is the only LogicalRepRelMapEntry structure member
that is useful for create_estate_for_relation.
---
 src/backend/replication/logical/worker.c | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04985c9..6820c1a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -183,7 +183,7 @@ ensure_transaction(void)
  * This is based on similar code in copy.c
  */
 static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel)
+create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
 	ResultRelInfo *resultRelInfo;
@@ -193,12 +193,12 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
 
 	rte = makeNode(RangeTblEntry);
 	rte->rtekind = RTE_RELATION;
-	rte->relid = RelationGetRelid(rel->localrel);
-	rte->relkind = rel->localrel->rd_rel->relkind;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
 	estate->es_range_table = list_make1(rte);
 
 	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
+	InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
 
 	estate->es_result_relations = resultRelInfo;
 	estate->es_num_result_relations = 1;
@@ -584,7 +584,7 @@ apply_handle_insert(StringInfo s)
 	}
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel));
 
@@ -688,7 +688,7 @@ apply_handle_update(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel));
 	localslot = ExecInitExtraTupleSlot(estate,
@@ -806,7 +806,7 @@ apply_handle_delete(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel));
 	localslot = ExecInitExtraTupleSlot(estate,
-- 
2.7.4

From 8421809e38d3e1d43b00feaac4b8bccaa6738079 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 24 Jan 2018 17:01:31 -0200
Subject: [PATCH 2/3] Rename a WHERE node

A WHERE clause will be used for row filtering in logical replication. We
already have a similar node: 'WHERE (condition here)'. Let's rename the
node to a generic name and use it for row filtering too.
---
 src/backend/parser/gram.y | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index d99f2be..bf32362 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -468,7 +468,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	def_arg columnElem where_clause where_or_current_clause
 				a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound
 				columnref in_expr having_clause func_table xmltable array_expr
-				ExclusionWhereClause operator_def_arg
+				OptWhereClause operator_def_arg
 %type <list>	rowsfrom_item rowsfrom_list opt_col_def_list
 %type <boolean> opt_ordinality
 %type <list>	ExclusionConstraintList ExclusionConstraintElem
@@ -3734,7 +3734,7 @@ ConstraintElem:
 					$$ = (Node *)n;
 				}
 			| EXCLUDE access_method_clause '(' ExclusionConstraintList ')'
-				opt_definition OptConsTableSpace ExclusionWhereClause
+				opt_definition OptConsTableSpace OptWhereClause
 				ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
@@ -3831,7 +3831,7 @@ ExclusionConstraintElem: index_elem WITH any_operator
 			}
 		;
 
-ExclusionWhereClause:
+OptWhereClause:
 			WHERE '(' a_expr ')'					{ $$ = $3; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
-- 
2.7.4

From 6567e49f95823532bc2ceccf87ed570ca4ce398d Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 04:03:13 +0000
Subject: [PATCH 3/3] Row filtering for logical replication

When you define or modify a publication you optionally can filter rows
to be published using a WHERE condition. This condition is any
expression that evaluates to boolean. Only those rows that
satisfy the WHERE condition will be sent to subscribers.
---
 doc/src/sgml/ref/alter_publication.sgml     |  9 ++-
 doc/src/sgml/ref/create_publication.sgml    | 14 ++++-
 src/backend/catalog/pg_publication.c        | 45 +++++++++++--
 src/backend/commands/publicationcmds.c      | 69 ++++++++++++++------
 src/backend/parser/gram.y                   | 26 ++++++--
 src/backend/parser/parse_agg.c              | 10 +++
 src/backend/parser/parse_expr.c             |  5 ++
 src/backend/parser/parse_func.c             |  3 +
 src/backend/replication/logical/worker.c    |  2 +-
 src/backend/replication/pgoutput/pgoutput.c | 98 ++++++++++++++++++++++++++++-
 src/include/catalog/pg_publication.h        |  8 ++-
 src/include/catalog/pg_publication_rel.h    | 11 +++-
 src/include/nodes/nodes.h                   |  1 +
 src/include/nodes/parsenodes.h              | 11 +++-
 src/include/parser/parse_node.h             |  3 +-
 src/include/replication/logicalrelation.h   |  2 +
 src/test/subscription/t/001_rep_changes.pl  | 29 ++++++++-
 17 files changed, 299 insertions(+), 47 deletions(-)

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 534e598..dc579b2 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -21,8 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE <replaceable class="parameter">condition</replaceable> ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE <replaceable class="parameter">condition</replaceable> ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
@@ -91,7 +91,10 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">condition</replaceable> will
+      not be published.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index bfe12d5..e42f3d4 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE <replaceable class="parameter">condition</replaceable> ] [, ...]
       | FOR ALL TABLES ]
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 
@@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included. If the optional
+      <literal>WHERE</literal> clause is specified, rows that do not satisfy
+      the <replaceable class="parameter">condition</replaceable> will not be
+      published.
      </para>
 
      <para>
@@ -184,6 +187,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
   </para>
 
   <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
+  <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
 CREATE PUBLICATION alltables FOR ALL TABLES;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ba18258..43d754d 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -34,6 +34,10 @@
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -142,18 +146,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState		*pstate;
+	RangeTblEntry	*rte;
+	Node			*whereclause;
 
 	rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -173,10 +180,26 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
+
+	/* Set up a pstate to parse with */
+	pstate = make_parsestate(NULL);
+	pstate->p_sourcetext = nodeToString(targetrel->whereClause);
+
+	rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
+										NULL, false, false);
+	addRTEtoQuery(pstate, rte, false, true, true);
+
+	whereclause = transformWhereClause(pstate,
+								copyObject(targetrel->whereClause),
+								EXPR_KIND_PUBLICATION_WHERE,
+								"PUBLICATION");
+
+	/* Fix up collation information */
+	assign_expr_collations(pstate, whereclause);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -187,6 +210,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add row filter, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prrowfilter - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prrowfilter - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -203,11 +232,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the row filter expression */
+	if (whereclause)
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+
+	free_parsestate(pstate);
+
 	/* Close the table. */
 	heap_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c5aa9e..96347bb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -324,6 +324,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	/*
+	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
+	 * publication_table_list node (that accepts a WHERE clause) but forbid the
+	 * WHERE clause in it.  The use of relation_expr_list node just for the
+	 * DROP TABLE part does not worth the trouble.
+	 */
+	if (stmt->tableAction == DEFELEM_DROP)
+	{
+		ListCell	*lc;
+
+		foreach(lc, stmt->tables)
+		{
+			PublicationTable *t = lfirst(lc);
+			if (t->whereClause)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
+								NameStr(pubform->pubname))));
+		}
+	}
+
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
@@ -345,9 +366,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			foreach(newlc, rels)
 			{
-				Relation	newrel = (Relation) lfirst(newlc);
+				PublicationRelationQual	*newrel = (PublicationRelationQual *) lfirst(newlc);
 
-				if (RelationGetRelid(newrel) == oldrelid)
+				if (RelationGetRelid(newrel->relation) == oldrelid)
 				{
 					found = true;
 					break;
@@ -356,7 +377,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			if (!found)
 			{
-				Relation	oldrel = heap_open(oldrelid,
+				PublicationRelationQual *oldrel = palloc(sizeof(PublicationRelationQual));
+				oldrel->relation = heap_open(oldrelid,
 											   ShareUpdateExclusiveLock);
 
 				delrels = lappend(delrels, oldrel);
@@ -479,16 +501,18 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	PublicationRelationQual	*relqual;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = lfirst(lc);
-		Relation	rel;
-		bool		recurse = rv->inh;
-		Oid			myrelid;
+		PublicationTable	*t = lfirst(lc);
+		RangeVar  			*rv = t->relation;
+		Relation			rel;
+		bool				recurse = rv->inh;
+		Oid					myrelid;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -507,7 +531,10 @@ OpenTableList(List *tables)
 			heap_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
-		rels = lappend(rels, rel);
+		relqual = palloc(sizeof(PublicationRelationQual));
+		relqual->relation = rel;
+		relqual->whereClause = t->whereClause;
+		rels = lappend(rels, relqual);
 		relids = lappend_oid(relids, myrelid);
 
 		if (recurse)
@@ -537,7 +564,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = heap_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				relqual = palloc(sizeof(PublicationRelationQual));
+				relqual->relation = rel;
+				/* child inherits WHERE clause from parent */
+				relqual->whereClause = t->whereClause;
+				rels = lappend(rels, relqual);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -558,10 +589,12 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 
-		heap_close(rel, NoLock);
+		heap_close(rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -577,13 +610,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
-		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
-			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
-						   RelationGetRelationName(rel));
+		if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
+						   RelationGetRelationName(rel->relation));
 
 		obj = publication_add_relation(pubid, rel, if_not_exists);
 		if (stmt)
@@ -609,8 +642,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
-		Oid			relid = RelationGetRelid(rel);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel->relation);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
@@ -622,7 +655,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
+							RelationGetRelationName(rel->relation))));
 		}
 
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index bf32362..94cdd7d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -396,13 +396,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				relation_expr_list dostmt_opt_list
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
-				publication_name_list
+				publication_name_list publication_table_list
 				vacuum_relation_list opt_vacuum_relation_list
 
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table_elem
 %type <value>	publication_name_item
 
 %type <list>	opt_fdw_options fdw_options
@@ -9520,7 +9520,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9551,7 +9551,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9559,7 +9559,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9567,7 +9567,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name DROP TABLE relation_expr_list
+			| ALTER PUBLICATION name DROP TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9577,6 +9577,20 @@ AlterPublicationStmt:
 				}
 		;
 
+publication_table_list:
+			publication_table_elem									{ $$ = list_make1($1); }
+			| publication_table_list ',' publication_table_elem		{ $$ = lappend($1, $3); }
+		;
+
+publication_table_elem: relation_expr OptWhereClause
+				{
+					PublicationTable *n = makeNode(PublicationTable);
+					n->relation = $1;
+					n->whereClause = $2;
+					$$ = (Node *) n;
+				}
+		;
+
 /*****************************************************************************
  *
  * CREATE SUBSCRIPTION name ...
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 377a7ed..7e1c3d8 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -522,6 +522,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in CALL arguments");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE conditions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE conditions");
+
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -902,6 +909,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("window functions are not allowed in CALL arguments");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE conditions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 385e54a..7bd1695 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -1849,6 +1849,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("cannot use subquery in CALL argument");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE condition");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -3475,6 +3478,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "PARTITION BY";
 		case EXPR_KIND_CALL_ARGUMENT:
 			return "CALL";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication WHERE";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 2a4ac09..8e9cc58 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2293,6 +2293,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("set-returning functions are not allowed in CALL arguments");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE conditions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6820c1a..fe0a6ca 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -182,7 +182,7 @@ ensure_transaction(void)
  *
  * This is based on similar code in copy.c
  */
-static EState *
+EState *
 create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d538f25..30bdefa 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,13 +12,23 @@
  */
 #include "postgres.h"
 
+#include "catalog/pg_type.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+
+#include "executor/executor.h"
+#include "nodes/execnodes.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/planner.h"
+#include "parser/parse_coerce.h"
 
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/lsyscache.h"
@@ -56,6 +66,7 @@ typedef struct RelationSyncEntry
 	bool		schema_sent;	/* did we send the schema? */
 	bool		replicate_valid;
 	PublicationActions pubactions;
+	List		*row_filter;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -286,6 +297,62 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* ... then check row filter */
+	if (list_length(relentry->row_filter) > 0)
+	{
+		HeapTuple		old_tuple;
+		HeapTuple		new_tuple;
+		TupleDesc		tupdesc;
+		EState			*estate;
+		ExprContext		*ecxt;
+		MemoryContext	oldcxt;
+		ListCell		*lc;
+
+		old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
+		new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL;
+		tupdesc = RelationGetDescr(relation);
+		estate = create_estate_for_relation(relation);
+
+		/* prepare context per tuple */
+		ecxt = GetPerTupleExprContext(estate);
+		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
+		ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc);
+		MemoryContextSwitchTo(oldcxt);
+
+		ExecStoreTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, InvalidBuffer, false);
+
+		foreach (lc, relentry->row_filter)
+		{
+			Node		*row_filter;
+			ExprState	*expr_state;
+			Expr		*expr;
+			Oid			expr_type;
+			Datum		res;
+			bool		isnull;
+			char		*s = NULL;
+
+			row_filter = (Node *) lfirst(lc);
+
+			/* evaluates row filter */
+			expr_type = exprType(row_filter);
+			expr = (Expr *) coerce_to_target_type(NULL, row_filter, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+			expr = expression_planner(expr);
+			expr_state = ExecInitExpr(expr, NULL);
+			res = ExecEvalExpr(expr_state, ecxt, &isnull);
+
+			/* if tuple does not match row filter, bail out */
+			if (!DatumGetBool(res) || isnull)
+				return;
+
+			s = nodeToString(row_filter);
+			elog(DEBUG2, "filter \"%s\" was matched", s);
+			pfree(s);
+		}
+
+		ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+		FreeExecutorState(estate);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -506,10 +573,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		 */
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = false;
+		entry->row_filter = NIL;
 
 		foreach(lc, data->publications)
 		{
 			Publication *pub = lfirst(lc);
+			HeapTuple	rf_tuple;
+			Datum		rf_datum;
+			bool		rf_isnull;
 
 			/*
 			 * Skip tables that look like they are from a heap rewrite (see
@@ -543,9 +614,25 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete)
-				break;
+			/* Cache row filters, if available */
+			rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rf_tuple))
+			{
+				rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prrowfilter, &rf_isnull);
+
+				if (!rf_isnull)
+				{
+					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					char	*s = TextDatumGetCString(rf_datum);
+					Node	*rf_node = stringToNode(s);
+					entry->row_filter = lappend(entry->row_filter, rf_node);
+					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", s, pub->name, get_rel_name(relid));
+				}
+
+				ReleaseSysCache(rf_tuple);
+			}
 		}
 
 		list_free(pubids);
@@ -620,5 +707,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
 		entry->replicate_valid = false;
+		if (list_length(entry->row_filter) > 0)
+			list_free(entry->row_filter);
+		entry->row_filter = NIL;
+	}
 }
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 37e77b8..28962e6 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,12 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationQual
+{
+	Relation	relation;
+	Node		*whereClause;
+} PublicationRelationQual;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -94,7 +100,7 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index 033b600..585f855 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -29,8 +29,12 @@
 
 CATALOG(pg_publication_rel,6106)
 {
-	Oid			prpubid;		/* Oid of the publication */
-	Oid			prrelid;		/* Oid of the relation */
+	Oid				prpubid;		/* Oid of the publication */
+	Oid				prrelid;		/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN				/* variable-length fields start here */
+	pg_node_tree	prrowfilter;	/* nodeToString representation of row filter */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -45,8 +49,9 @@ typedef FormData_pg_publication_rel *Form_pg_publication_rel;
  * ----------------
  */
 
-#define Natts_pg_publication_rel				2
+#define Natts_pg_publication_rel				3
 #define Anum_pg_publication_rel_prpubid			1
 #define Anum_pg_publication_rel_prrelid			2
+#define	Anum_pg_publication_rel_prrowfilter		3
 
 #endif							/* PG_PUBLICATION_REL_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 74b094a..499d839 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -471,6 +471,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ac292bc..9800acf 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3419,12 +3419,19 @@ typedef struct AlterTSConfigurationStmt
 } AlterTSConfigurationStmt;
 
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar	*relation;		/* relation to be published */
+	Node		*whereClause;	/* qualifications */
+} PublicationTable;
+
 typedef struct CreatePublicationStmt
 {
 	NodeTag		type;
 	char	   *pubname;		/* Name of of the publication */
 	List	   *options;		/* List of DefElem nodes */
-	List	   *tables;			/* Optional list of tables to add */
+	List	   *tables;			/* Optional list of PublicationTable to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
@@ -3437,7 +3444,7 @@ typedef struct AlterPublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
-	List	   *tables;			/* List of tables to add/drop */
+	List	   *tables;			/* List of PublicationTable to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
 } AlterPublicationStmt;
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 0230543..8e3c735 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -69,7 +69,8 @@ typedef enum ParseExprKind
 	EXPR_KIND_TRIGGER_WHEN,		/* WHEN condition in CREATE TRIGGER */
 	EXPR_KIND_POLICY,			/* USING or WITH CHECK expr in policy */
 	EXPR_KIND_PARTITION_EXPRESSION, /* PARTITION BY expression */
-	EXPR_KIND_CALL_ARGUMENT		/* procedure argument in CALL */
+	EXPR_KIND_CALL_ARGUMENT,	/* procedure argument in CALL */
+	EXPR_KIND_PUBLICATION_WHERE	/* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index d4250c2..32f1312 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
 extern Oid	logicalrep_typmap_getid(Oid remoteid);
 
+extern EState *create_estate_for_relation(Relation rel);
+
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index e0104cd..d40ae03 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 17;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -31,6 +31,8 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_mixed (a int primary key, b text)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter (a int primary key, b text)");
 
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
@@ -39,6 +41,8 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab_rep (a int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter (a int primary key, b text)");
 
 # different column count and order than on publisher
 $node_subscriber->safe_psql('postgres',
@@ -54,10 +58,12 @@ $node_publisher->safe_psql('postgres',
 );
 $node_publisher->safe_psql('postgres',
 	"ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_row_filter FOR TABLE tab_rowfilter WHERE (a > 1000 AND b <> 'filtered')");
 
 my $appname = 'tap_sub';
 $node_subscriber->safe_psql('postgres',
-"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only"
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only, tap_pub_row_filter"
 );
 
 $node_publisher->wait_for_catchup($appname);
@@ -76,6 +82,25 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+# row filter
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter (a, b) SELECT x, 'test ' || x FROM generate_series(990,1003) x");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter");
+is($result, qq(1980|not filtered
+1001|test 1001
+1002|test 1002
+1003|test 1003), 'check initial data was copied to subscriber');
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
-- 
2.7.4

Reply via email to