Em qua, 28 de fev de 2018 às 20:03, Euler Taveira
<eu...@timbira.com.br> escreveu:
> The attached patches add support for filtering rows in the publisher.
>
I rebased the patch. I added row filtering for initial
synchronization, pg_dump support and psql support. 0001 removes unused
code. 0002 reduces memory use. 0003 passes only structure member that
is used in create_estate_for_relation. 0004 reuses a parser node for
row filtering. 0005 is the feature. 0006 prints WHERE expression in
psql. 0007 adds pg_dump support. 0008 is only for debug purposes (I'm
not sure some of these messages will be part of the final patch).
0001, 0002, 0003 and 0008 are not mandatory for this feature.

Comments?


-- 
   Euler Taveira                                   Timbira -
http://www.timbira.com.br/
   PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
From b2e56eaa9e16246c8158ff2961a6a4b2acbd096b Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 9 Mar 2018 18:39:22 +0000
Subject: [PATCH 1/8] Remove unused atttypmod column from initial table
 synchronization

 Since commit 7c4f52409a8c7d85ed169bbbc1f6092274d03920, atttypmod was
 added but not used. The removal is safe because COPY from publisher
 does not need such information.
---
 src/backend/replication/logical/tablesync.c | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6e420d8..f285813 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -660,7 +660,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {OIDOID, CHAROID};
-	Oid			attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+	Oid			attrRow[3] = {TEXTOID, OIDOID, BOOLOID};
 	bool		isnull;
 	int			natt;
 
@@ -704,7 +704,6 @@ fetch_remote_table_info(char *nspname, char *relname,
 	appendStringInfo(&cmd,
 					 "SELECT a.attname,"
 					 "       a.atttypid,"
-					 "       a.atttypmod,"
 					 "       a.attnum = ANY(i.indkey)"
 					 "  FROM pg_catalog.pg_attribute a"
 					 "  LEFT JOIN pg_catalog.pg_index i"
@@ -714,7 +713,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "   AND a.attrelid = %u"
 					 " ORDER BY a.attnum",
 					 lrel->remoteid, lrel->remoteid);
-	res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -735,7 +734,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		Assert(!isnull);
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
-		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
+		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
 		/* Should never happen. */
-- 
2.7.4

From 797a0e8d858b490df7a9e1526f76e49fe1e10215 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 9 Mar 2018 17:37:36 +0000
Subject: [PATCH 2/8] Store number of tuples in WalRcvExecResult

It seems to be a useful information while allocating memory for queries
that returns more than one row. It reduces memory allocation
for initial table synchronization.

While in it, since we have the number of columns, allocate only nfields
for cstrs instead of MaxTupleAttributeNumber.
---
 src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 9 ++++++---
 src/backend/replication/logical/tablesync.c                 | 5 ++---
 src/include/replication/walreceiver.h                       | 1 +
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 1e1695e..2533e3a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -865,6 +865,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 				 errdetail("Expected %d fields, got %d fields.",
 						   nRetTypes, nfields)));
 
+	walres->ntuples = PQntuples(pgres);
 	walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
 
 	/* Create tuple descriptor corresponding to expected result. */
@@ -875,7 +876,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
 
 	/* No point in doing more here if there were no tuples returned. */
-	if (PQntuples(pgres) == 0)
+	if (walres->ntuples == 0)
 		return;
 
 	/* Create temporary context for local allocations. */
@@ -884,15 +885,17 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 									   ALLOCSET_DEFAULT_SIZES);
 
 	/* Process returned rows. */
-	for (tupn = 0; tupn < PQntuples(pgres); tupn++)
+	for (tupn = 0; tupn < walres->ntuples; tupn++)
 	{
-		char	   *cstrs[MaxTupleAttributeNumber];
+		char	**cstrs;
 
 		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
 
+		cstrs = palloc(nfields * sizeof(char *));
+
 		/*
 		 * Fill cstrs with null-terminated strings of column values.
 		 */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f285813..e119781 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -720,9 +720,8 @@ fetch_remote_table_info(char *nspname, char *relname,
 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
 						nspname, relname, res->err)));
 
-	/* We don't know the number of rows coming, so allocate enough space. */
-	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
-	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
+	lrel->attnames = palloc0(res->ntuples * sizeof(char *));
+	lrel->atttyps = palloc0(res->ntuples * sizeof(Oid));
 	lrel->attkeys = NULL;
 
 	natt = 0;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 5913b58..62f63f9 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -197,6 +197,7 @@ typedef struct WalRcvExecResult
 	char	   *err;
 	Tuplestorestate *tuplestore;
 	TupleDesc	tupledesc;
+	int			ntuples;
 } WalRcvExecResult;
 
 /* libpqwalreceiver hooks */
-- 
2.7.4

From 6dd5414b8dcf7b94b0901c9dfbd50d68a4c33ba1 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 02:21:03 +0000
Subject: [PATCH 3/8] Refactor function create_estate_for_relation

Relation localrel is the only LogicalRepRelMapEntry structure member
that is useful for create_estate_for_relation.
---
 src/backend/replication/logical/worker.c | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 277da69..fa2f0ad 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -187,7 +187,7 @@ ensure_transaction(void)
  * This is based on similar code in copy.c
  */
 static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel)
+create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
 	ResultRelInfo *resultRelInfo;
@@ -197,13 +197,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
 
 	rte = makeNode(RangeTblEntry);
 	rte->rtekind = RTE_RELATION;
-	rte->relid = RelationGetRelid(rel->localrel);
-	rte->relkind = rel->localrel->rd_rel->relkind;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
 	rte->rellockmode = AccessShareLock;
 	ExecInitRangeTable(estate, list_make1(rte));
 
 	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
+	InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
 
 	estate->es_result_relations = resultRelInfo;
 	estate->es_num_result_relations = 1;
@@ -607,7 +607,7 @@ apply_handle_insert(StringInfo s)
 	}
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel));
 
@@ -713,7 +713,7 @@ apply_handle_update(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel));
 	localslot = ExecInitExtraTupleSlot(estate,
@@ -831,7 +831,7 @@ apply_handle_delete(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel));
 	localslot = ExecInitExtraTupleSlot(estate,
-- 
2.7.4

From 848bc00f5e5c7b16c768eca2055a56aaef579817 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 24 Jan 2018 17:01:31 -0200
Subject: [PATCH 4/8] Rename a WHERE node

A WHERE clause will be used for row filtering in logical replication. We
already have a similar node: 'WHERE (condition here)'. Let's rename the
node to a generic name and use it for row filtering too.
---
 src/backend/parser/gram.y | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6d23bfb..756f0dd 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -470,7 +470,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	def_arg columnElem where_clause where_or_current_clause
 				a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound
 				columnref in_expr having_clause func_table xmltable array_expr
-				ExclusionWhereClause operator_def_arg
+				OptWhereClause operator_def_arg
 %type <list>	rowsfrom_item rowsfrom_list opt_col_def_list
 %type <boolean> opt_ordinality
 %type <list>	ExclusionConstraintList ExclusionConstraintElem
@@ -3742,7 +3742,7 @@ ConstraintElem:
 					$$ = (Node *)n;
 				}
 			| EXCLUDE access_method_clause '(' ExclusionConstraintList ')'
-				opt_c_include opt_definition OptConsTableSpace  ExclusionWhereClause
+				opt_c_include opt_definition OptConsTableSpace  OptWhereClause
 				ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
@@ -3844,7 +3844,7 @@ ExclusionConstraintElem: index_elem WITH any_operator
 			}
 		;
 
-ExclusionWhereClause:
+OptWhereClause:
 			WHERE '(' a_expr ')'					{ $$ = $3; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
-- 
2.7.4

From 80f710ffe42329d321e803cffc45314f35eda6c2 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 04:03:13 +0000
Subject: [PATCH 5/8] Row filtering for logical replication

When you define or modify a publication you optionally can filter rows
to be published using a WHERE condition. This condition is any
expression that evaluates to boolean. Only those rows that
satisfy the WHERE condition will be sent to subscribers.
---
 doc/src/sgml/ref/alter_publication.sgml     |   9 ++-
 doc/src/sgml/ref/create_publication.sgml    |  14 +++-
 src/backend/catalog/pg_publication.c        |  46 +++++++++--
 src/backend/commands/publicationcmds.c      |  69 +++++++++++-----
 src/backend/parser/gram.y                   |  26 ++++--
 src/backend/parser/parse_agg.c              |  10 +++
 src/backend/parser/parse_expr.c             |   4 +
 src/backend/parser/parse_func.c             |   2 +
 src/backend/replication/logical/proto.c     |   2 +-
 src/backend/replication/logical/relation.c  |  13 +++
 src/backend/replication/logical/tablesync.c | 119 +++++++++++++++++++++++++---
 src/backend/replication/logical/worker.c    |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |  97 ++++++++++++++++++++++-
 src/include/catalog/pg_publication.h        |   8 +-
 src/include/catalog/pg_publication_rel.h    |   8 +-
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/parsenodes.h              |  11 ++-
 src/include/parser/parse_node.h             |   3 +-
 src/include/replication/logicalproto.h      |   2 +
 src/include/replication/logicalrelation.h   |   2 +
 src/test/regress/expected/misc_sanity.out   |   3 +-
 src/test/subscription/t/010_row_filter.pl   |  97 +++++++++++++++++++++++
 22 files changed, 492 insertions(+), 56 deletions(-)
 create mode 100644 src/test/subscription/t/010_row_filter.pl

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 534e598..5984915 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -21,8 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
@@ -91,7 +91,10 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">expression</replaceable> will
+      not be published. Note that parentheses are required around the expression.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index 99f87ca..d5fed30 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
       | FOR ALL TABLES ]
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 
@@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included. If the optional
+      <literal>WHERE</literal> clause is specified, rows that do not satisfy
+      the <replaceable class="parameter">expression</replaceable> will not be
+      published. Note that parentheses are required around the expression.
      </para>
 
      <para>
@@ -184,6 +187,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
   </para>
 
   <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
+  <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
 CREATE PUBLICATION alltables FOR ALL TABLES;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 3ecf6d5..f9f18a6 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -34,6 +34,10 @@
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -142,18 +146,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState		*pstate;
+	RangeTblEntry	*rte;
+	Node			*whereclause;
 
 	rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -173,10 +180,27 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
+
+	/* Set up a pstate to parse with */
+	pstate = make_parsestate(NULL);
+	pstate->p_sourcetext = nodeToString(targetrel->whereClause);
+
+	rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
+										AccessShareLock,
+										NULL, false, false);
+	addRTEtoQuery(pstate, rte, false, true, true);
+
+	whereclause = transformWhereClause(pstate,
+								copyObject(targetrel->whereClause),
+								EXPR_KIND_PUBLICATION_WHERE,
+								"PUBLICATION");
+
+	/* Fix up collation information */
+	assign_expr_collations(pstate, whereclause);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -187,6 +211,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add row filter, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prrowfilter - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prrowfilter - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -203,11 +233,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the row filter expression */
+	if (whereclause)
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+
+	free_parsestate(pstate);
+
 	/* Close the table. */
 	heap_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 6f7762a..d4fca7f 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -338,6 +338,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	/*
+	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
+	 * publication_table_list node (that accepts a WHERE clause) but forbid the
+	 * WHERE clause in it.  The use of relation_expr_list node just for the
+	 * DROP TABLE part does not worth the trouble.
+	 */
+	if (stmt->tableAction == DEFELEM_DROP)
+	{
+		ListCell	*lc;
+
+		foreach(lc, stmt->tables)
+		{
+			PublicationTable *t = lfirst(lc);
+			if (t->whereClause)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
+								NameStr(pubform->pubname))));
+		}
+	}
+
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
@@ -359,9 +380,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			foreach(newlc, rels)
 			{
-				Relation	newrel = (Relation) lfirst(newlc);
+				PublicationRelationQual	*newrel = (PublicationRelationQual *) lfirst(newlc);
 
-				if (RelationGetRelid(newrel) == oldrelid)
+				if (RelationGetRelid(newrel->relation) == oldrelid)
 				{
 					found = true;
 					break;
@@ -370,7 +391,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			if (!found)
 			{
-				Relation	oldrel = heap_open(oldrelid,
+				PublicationRelationQual *oldrel = palloc(sizeof(PublicationRelationQual));
+				oldrel->relation = heap_open(oldrelid,
 											   ShareUpdateExclusiveLock);
 
 				delrels = lappend(delrels, oldrel);
@@ -493,16 +515,18 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	PublicationRelationQual	*relqual;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = lfirst(lc);
-		Relation	rel;
-		bool		recurse = rv->inh;
-		Oid			myrelid;
+		PublicationTable	*t = lfirst(lc);
+		RangeVar  			*rv = t->relation;
+		Relation			rel;
+		bool				recurse = rv->inh;
+		Oid					myrelid;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -521,7 +545,10 @@ OpenTableList(List *tables)
 			heap_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
-		rels = lappend(rels, rel);
+		relqual = palloc(sizeof(PublicationRelationQual));
+		relqual->relation = rel;
+		relqual->whereClause = t->whereClause;
+		rels = lappend(rels, relqual);
 		relids = lappend_oid(relids, myrelid);
 
 		if (recurse)
@@ -551,7 +578,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = heap_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				relqual = palloc(sizeof(PublicationRelationQual));
+				relqual->relation = rel;
+				/* child inherits WHERE clause from parent */
+				relqual->whereClause = t->whereClause;
+				rels = lappend(rels, relqual);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -572,10 +603,12 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 
-		heap_close(rel, NoLock);
+		heap_close(rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -591,13 +624,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
-		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
-			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
-						   RelationGetRelationName(rel));
+		if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
+						   RelationGetRelationName(rel->relation));
 
 		obj = publication_add_relation(pubid, rel, if_not_exists);
 		if (stmt)
@@ -623,8 +656,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
-		Oid			relid = RelationGetRelid(rel);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel->relation);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
@@ -636,7 +669,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
+							RelationGetRelationName(rel->relation))));
 		}
 
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 756f0dd..edf1fef 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -398,13 +398,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				relation_expr_list dostmt_opt_list
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
-				publication_name_list
+				publication_name_list publication_table_list
 				vacuum_relation_list opt_vacuum_relation_list
 
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table_elem
 %type <value>	publication_name_item
 
 %type <list>	opt_fdw_options fdw_options
@@ -9526,7 +9526,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9557,7 +9557,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9565,7 +9565,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9573,7 +9573,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name DROP TABLE relation_expr_list
+			| ALTER PUBLICATION name DROP TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9583,6 +9583,20 @@ AlterPublicationStmt:
 				}
 		;
 
+publication_table_list:
+			publication_table_elem									{ $$ = list_make1($1); }
+			| publication_table_list ',' publication_table_elem		{ $$ = lappend($1, $3); }
+		;
+
+publication_table_elem: relation_expr OptWhereClause
+				{
+					PublicationTable *n = makeNode(PublicationTable);
+					n->relation = $1;
+					n->whereClause = $2;
+					$$ = (Node *) n;
+				}
+		;
+
 /*****************************************************************************
  *
  * CREATE SUBSCRIPTION name ...
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 61727e1..128d0b9 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -522,6 +522,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in CALL arguments");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE expressions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE expressions");
+
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -902,6 +909,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("window functions are not allowed in CALL arguments");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 385e54a..55cc385 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -1848,6 +1848,8 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 			break;
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("cannot use subquery in CALL argument");
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE expression");
 			break;
 
 			/*
@@ -3475,6 +3477,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "PARTITION BY";
 		case EXPR_KIND_CALL_ARGUMENT:
 			return "CALL";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication expression";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 4425715..e997ea0 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2369,6 +2369,8 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 			break;
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("set-returning functions are not allowed in CALL arguments");
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE expressions");
 			break;
 
 			/*
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1945171..7ce9378 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -378,7 +378,7 @@ logicalrep_write_rel(StringInfo out, Relation rel)
 LogicalRepRelation *
 logicalrep_read_rel(StringInfo in)
 {
-	LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
+	LogicalRepRelation *rel = palloc0(sizeof(LogicalRepRelation));
 
 	rel->remoteid = pq_getmsgint(in, 4);
 
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 1f20df5..8cbb394 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -140,6 +140,16 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
 	}
 	bms_free(remoterel->attkeys);
 
+	if (remoterel->nrowfilters > 0)
+	{
+		int		i;
+
+		for (i = 0; i < remoterel->nrowfilters; i++)
+			pfree(remoterel->rowfiltercond[i]);
+
+		pfree(remoterel->rowfiltercond);
+	}
+
 	if (entry->attrmap)
 		pfree(entry->attrmap);
 }
@@ -187,6 +197,9 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 	}
 	entry->remoterel.replident = remoterel->replident;
 	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+	entry->remoterel.rowfiltercond = palloc(remoterel->nrowfilters * sizeof(char *));
+	for (i = 0; i < remoterel->nrowfilters; i++)
+		entry->remoterel.rowfiltercond[i] = pstrdup(remoterel->rowfiltercond[i]);
 	MemoryContextSwitchTo(oldctx);
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e119781..fa7c111 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -661,8 +661,14 @@ fetch_remote_table_info(char *nspname, char *relname,
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {OIDOID, CHAROID};
 	Oid			attrRow[3] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			rowfilterRow[1] = {TEXTOID};
 	bool		isnull;
-	int			natt;
+	int			n;
+	ListCell   *lc;
+	bool		first;
+
+	/* Avoid trashing relation map cache */
+	memset(lrel, 0, sizeof(LogicalRepRelation));
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -724,20 +730,20 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->atttyps = palloc0(res->ntuples * sizeof(Oid));
 	lrel->attkeys = NULL;
 
-	natt = 0;
+	n = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
+		lrel->attnames[n] =
 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+		lrel->atttyps[n] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
-			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+			lrel->attkeys = bms_add_member(lrel->attkeys, n);
 
 		/* Should never happen. */
-		if (++natt >= MaxTupleAttributeNumber)
+		if (++n >= MaxTupleAttributeNumber)
 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
 				 nspname, relname);
 
@@ -745,7 +751,54 @@ fetch_remote_table_info(char *nspname, char *relname,
 	}
 	ExecDropSingleTupleTableSlot(slot);
 
-	lrel->natts = natt;
+	lrel->natts = n;
+
+	walrcv_clear_result(res);
+
+	/* Fetch row filtering info */
+	resetStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT pg_get_expr(prrowfilter, prrelid) FROM pg_publication p INNER JOIN pg_publication_rel pr ON (p.oid = pr.prpubid) WHERE pr.prrelid = %u AND p.pubname IN (", MyLogicalRepWorker->relid);
+
+	first = true;
+	foreach(lc, MySubscription->publications)
+	{
+		char	*pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 1, rowfilterRow);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch row filter info for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res->err)));
+
+	lrel->rowfiltercond = palloc0(res->ntuples * sizeof(char *));
+
+	n = 0;
+	slot = MakeSingleTupleTableSlot(res->tupledesc);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		Datum rf = slot_getattr(slot, 1, &isnull);
+
+		if (!isnull)
+		{
+			char *p = TextDatumGetCString(rf);
+			lrel->rowfiltercond[n++] = p;
+		}
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	lrel->nrowfilters = n;
 
 	walrcv_clear_result(res);
 	pfree(cmd.data);
@@ -778,10 +831,57 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	/* list of columns for COPY */
+	attnamelist = make_copy_attnamelist(relmapentry);
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "COPY %s TO STDOUT",
-					 quote_qualified_identifier(lrel.nspname, lrel.relname));
+	/*
+	 * If publication has any row filter, build a SELECT query with OR'ed row
+	 * filters for COPY.
+	 * If no row filters are available, use COPY for all
+	 * table contents.
+	 */
+	if (lrel.nrowfilters > 0)
+	{
+		ListCell   *lc;
+		bool		first;
+		int			i;
+
+		appendStringInfoString(&cmd, "COPY (SELECT ");
+		/* list of attribute names */
+		first = true;
+		foreach(lc, attnamelist)
+		{
+			char	*col = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+			appendStringInfo(&cmd, "%s", quote_identifier(col));
+		}
+		appendStringInfo(&cmd, " FROM %s",
+						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		appendStringInfoString(&cmd, " WHERE ");
+		/* list of OR'ed filters */
+		first = true;
+		for (i = 0; i < lrel.nrowfilters; i++)
+		{
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, " OR ");
+			appendStringInfo(&cmd, "%s", lrel.rowfiltercond[i]);
+		}
+
+		appendStringInfoString(&cmd, ") TO STDOUT");
+	}
+	else
+	{
+		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+	}
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -796,7 +896,6 @@ copy_table(Relation rel)
 	addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
 								  NULL, false, false);
 
-	attnamelist = make_copy_attnamelist(relmapentry);
 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
 
 	/* Do the copy */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa2f0ad..f28a74f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -186,7 +186,7 @@ ensure_transaction(void)
  *
  * This is based on similar code in copy.c
  */
-static EState *
+EState *
 create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 86e0951..1f4a3d3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,13 +12,23 @@
  */
 #include "postgres.h"
 
+#include "catalog/pg_type.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+
+#include "executor/executor.h"
+#include "nodes/execnodes.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/planner.h"
+#include "parser/parse_coerce.h"
 
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/memutils.h"
@@ -58,6 +68,7 @@ typedef struct RelationSyncEntry
 	bool		schema_sent;	/* did we send the schema? */
 	bool		replicate_valid;
 	PublicationActions pubactions;
+	List		*row_filter;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -329,6 +340,63 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* ... then check row filter */
+	if (list_length(relentry->row_filter) > 0)
+	{
+		HeapTuple		old_tuple;
+		HeapTuple		new_tuple;
+		TupleDesc		tupdesc;
+		EState			*estate;
+		ExprContext		*ecxt;
+		MemoryContext	oldcxt;
+		ListCell		*lc;
+
+		old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
+		new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL;
+		tupdesc = RelationGetDescr(relation);
+		estate = create_estate_for_relation(relation);
+
+		/* prepare context per tuple */
+		ecxt = GetPerTupleExprContext(estate);
+		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
+		ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc);
+
+		ExecStoreHeapTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, false);
+
+		foreach (lc, relentry->row_filter)
+		{
+			Node		*row_filter;
+			ExprState	*expr_state;
+			Expr		*expr;
+			Oid			expr_type;
+			Datum		res;
+			bool		isnull;
+
+			row_filter = (Node *) lfirst(lc);
+
+			/* evaluates row filter */
+			expr_type = exprType(row_filter);
+			expr = (Expr *) coerce_to_target_type(NULL, row_filter, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+			expr = expression_planner(expr);
+			expr_state = ExecInitExpr(expr, NULL);
+			res = ExecEvalExpr(expr_state, ecxt, &isnull);
+
+			/* if tuple does not match row filter, bail out */
+			if (!DatumGetBool(res) || isnull)
+			{
+				MemoryContextSwitchTo(oldcxt);
+				ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+				FreeExecutorState(estate);
+				return;
+			}
+		}
+
+		MemoryContextSwitchTo(oldcxt);
+
+		ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+		FreeExecutorState(estate);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -564,10 +632,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		 */
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->row_filter = NIL;
 
 		foreach(lc, data->publications)
 		{
 			Publication *pub = lfirst(lc);
+			HeapTuple	rf_tuple;
+			Datum		rf_datum;
+			bool		rf_isnull;
 
 			if (pub->alltables || list_member_oid(pubids, pub->oid))
 			{
@@ -577,9 +649,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
+			/* Cache row filters, if available */
+			rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rf_tuple))
+			{
+				rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prrowfilter, &rf_isnull);
+
+				if (!rf_isnull)
+				{
+					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					char	*s = TextDatumGetCString(rf_datum);
+					Node	*rf_node = stringToNode(s);
+					entry->row_filter = lappend(entry->row_filter, rf_node);
+					MemoryContextSwitchTo(oldctx);
+				}
+
+				ReleaseSysCache(rf_tuple);
+			}
 		}
 
 		list_free(pubids);
@@ -654,5 +740,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
 		entry->replicate_valid = false;
+		if (list_length(entry->row_filter) > 0)
+			list_free(entry->row_filter);
+		entry->row_filter = NIL;
+	}
 }
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index a5d5570..e78222e 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -76,6 +76,12 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationQual
+{
+	Relation	relation;
+	Node		*whereClause;
+} PublicationRelationQual;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -84,7 +90,7 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index d97b0fe..f499253 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -28,8 +28,12 @@
  */
 CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 {
-	Oid			prpubid;		/* Oid of the publication */
-	Oid			prrelid;		/* Oid of the relation */
+	Oid				prpubid;		/* Oid of the publication */
+	Oid				prrelid;		/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN				/* variable-length fields start here */
+	pg_node_tree	prrowfilter;	/* nodeToString representation of row filter */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index cac6ff0..26b79d7 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -475,6 +475,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index aa4a0db..8ac4d81 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3448,12 +3448,19 @@ typedef struct AlterTSConfigurationStmt
 } AlterTSConfigurationStmt;
 
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar	*relation;		/* relation to be published */
+	Node		*whereClause;	/* qualifications */
+} PublicationTable;
+
 typedef struct CreatePublicationStmt
 {
 	NodeTag		type;
 	char	   *pubname;		/* Name of the publication */
 	List	   *options;		/* List of DefElem nodes */
-	List	   *tables;			/* Optional list of tables to add */
+	List	   *tables;			/* Optional list of PublicationTable to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
@@ -3466,7 +3473,7 @@ typedef struct AlterPublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
-	List	   *tables;			/* List of tables to add/drop */
+	List	   *tables;			/* List of PublicationTable to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
 } AlterPublicationStmt;
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 0230543..8e3c735 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -69,7 +69,8 @@ typedef enum ParseExprKind
 	EXPR_KIND_TRIGGER_WHEN,		/* WHEN condition in CREATE TRIGGER */
 	EXPR_KIND_POLICY,			/* USING or WITH CHECK expr in policy */
 	EXPR_KIND_PARTITION_EXPRESSION, /* PARTITION BY expression */
-	EXPR_KIND_CALL_ARGUMENT		/* procedure argument in CALL */
+	EXPR_KIND_CALL_ARGUMENT,	/* procedure argument in CALL */
+	EXPR_KIND_PUBLICATION_WHERE	/* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 8192f79..75be2f0 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -50,6 +50,8 @@ typedef struct LogicalRepRelation
 	Oid		   *atttyps;		/* column types */
 	char		replident;		/* replica identity */
 	Bitmapset  *attkeys;		/* Bitmap of key columns */
+	char	  **rowfiltercond;	/* condition for row filtering */
+	int			nrowfilters;	/* number of row filters */
 } LogicalRepRelation;
 
 /* Type mapping info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 73e4805..dd54295 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
 extern char *logicalrep_typmap_gettypname(Oid remoteid);
 
+extern EState *create_estate_for_relation(Relation rel);
+
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/regress/expected/misc_sanity.out b/src/test/regress/expected/misc_sanity.out
index 2d3522b..62eabbd 100644
--- a/src/test/regress/expected/misc_sanity.out
+++ b/src/test/regress/expected/misc_sanity.out
@@ -105,5 +105,6 @@ ORDER BY 1, 2;
  pg_index                | indpred       | pg_node_tree
  pg_largeobject          | data          | bytea
  pg_largeobject_metadata | lomacl        | aclitem[]
-(11 rows)
+ pg_publication_rel      | prrowfilter   | pg_node_tree
+(12 rows)
 
diff --git a/src/test/subscription/t/010_row_filter.pl b/src/test/subscription/t/010_row_filter.pl
new file mode 100644
index 0000000..6c174fa
--- /dev/null
+++ b/src/test/subscription/t/010_row_filter.pl
@@ -0,0 +1,97 @@
+# Teste logical replication behavior with row filtering
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+
+# setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+
+# setup logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')");
+
+my $result = $node_publisher->psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 DROP TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')");
+is($result, 3, "syntax error for ALTER PUBLICATION DROP TABLE");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 2 = 0)");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)");
+
+# test row filtering
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1003) x");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 10)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+#$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter_1");
+is($result, qq(1980|not filtered
+1001|test 1001
+1002|test 1002
+1003|test 1003), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(7|2|10), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(a) FROM tab_rowfilter_3");
+is($result, qq(10), 'check filtered data was copied to subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.7.4

From 7bfdda04dbfdb7acba84296bdf15fe49bafe2d7c Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Thu, 17 May 2018 20:52:28 +0000
Subject: [PATCH 6/8] Print publication WHERE condition in psql

---
 src/bin/psql/describe.c | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 4ca0db1..7aab8b9 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5445,7 +5445,8 @@ describePublications(const char *pattern)
 		if (!puballtables)
 		{
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
+							  "SELECT n.nspname, c.relname,\n"
+							  "  pg_get_expr(pr.prrowfilter, c.oid)\n"
 							  "FROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
@@ -5475,6 +5476,10 @@ describePublications(const char *pattern)
 								  PQgetvalue(tabres, j, 0),
 								  PQgetvalue(tabres, j, 1));
 
+				if (!PQgetisnull(tabres, j, 2))
+					appendPQExpBuffer(&buf, "  WHERE %s",
+									PQgetvalue(tabres, j, 2));
+
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(tabres);
-- 
2.7.4

From be36719198f6ad3f90164510601b35118870c389 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Sat, 15 Sep 2018 02:52:00 +0000
Subject: [PATCH 7/8] Publication where condition support for pg_dump

---
 src/bin/pg_dump/pg_dump.c | 15 +++++++++++++--
 src/bin/pg_dump/pg_dump.h |  1 +
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c8d01ed..4c63694 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3911,6 +3911,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_tableoid;
 	int			i_oid;
 	int			i_pubname;
+	int			i_pubrelqual;
 	int			i,
 				j,
 				ntups;
@@ -3944,7 +3945,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 		/* Get the publication membership for the table. */
 		appendPQExpBuffer(query,
-						  "SELECT pr.tableoid, pr.oid, p.pubname "
+						  "SELECT pr.tableoid, pr.oid, p.pubname, "
+						  "pg_catalog.pg_get_expr(pr.prrowfilter, pr.prrelid) AS pubrelqual "
 						  "FROM pg_publication_rel pr, pg_publication p "
 						  "WHERE pr.prrelid = '%u'"
 						  "  AND p.oid = pr.prpubid",
@@ -3965,6 +3967,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		i_tableoid = PQfnumber(res, "tableoid");
 		i_oid = PQfnumber(res, "oid");
 		i_pubname = PQfnumber(res, "pubname");
+		i_pubrelqual = PQfnumber(res, "pubrelqual");
 
 		pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
 
@@ -3980,6 +3983,11 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 			pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
 			pubrinfo[j].pubtable = tbinfo;
 
+			if (PQgetisnull(res, j, i_pubrelqual))
+				pubrinfo[j].pubrelqual = NULL;
+			else
+				pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, j, i_pubrelqual));
+
 			/* Decide whether we want to dump it */
 			selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
 		}
@@ -4008,8 +4016,11 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
 
 	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
 					  fmtId(pubrinfo->pubname));
-	appendPQExpBuffer(query, " %s;\n",
+	appendPQExpBuffer(query, " %s",
 					  fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrelqual)
+		appendPQExpBuffer(query, " WHERE %s", pubrinfo->pubrelqual);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating drop query as drop query as the drop is
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 685ad78..c2dfae6 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -609,6 +609,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	TableInfo  *pubtable;
 	char	   *pubname;
+	char	   *pubrelqual;
 } PublicationRelInfo;
 
 /*
-- 
2.7.4

From f1571f8b607333efe3f4a70b5dc73dd069e89573 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 14 Mar 2018 00:53:17 +0000
Subject: [PATCH 8/8] Debug for row filtering

---
 src/backend/commands/publicationcmds.c      | 11 +++++
 src/backend/replication/logical/tablesync.c |  1 +
 src/backend/replication/pgoutput/pgoutput.c | 66 +++++++++++++++++++++++++++++
 3 files changed, 78 insertions(+)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index d4fca7f..27f1102 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -327,6 +327,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	Oid			pubid = HeapTupleGetOid(tup);
 	List	   *rels = NIL;
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+	ListCell	*xpto;
 
 	/* Check that user is allowed to manipulate the publication tables. */
 	if (pubform->puballtables)
@@ -338,6 +339,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	foreach(xpto, stmt->tables)
+	{
+		PublicationTable *t = lfirst(xpto);
+
+		if (t->whereClause == NULL)
+			elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname));
+		else
+			elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname));
+	}
+
 	/*
 	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
 	 * publication_table_list node (that accepts a WHERE clause) but forbid the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index fa7c111..e0eb73c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -882,6 +882,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
+	elog(DEBUG2, "COPY for initial synchronization: %s", cmd.data);
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1f4a3d3..c7f0e32 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -31,6 +31,7 @@
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -316,6 +317,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 
+	Form_pg_class	class_form;
+	char			*schemaname;
+	char			*tablename;
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -340,6 +345,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	class_form = RelationGetForm(relation);
+	schemaname = get_namespace_name(class_form->relnamespace);
+	tablename = NameStr(class_form->relname);
+
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+		elog(DEBUG1, "INSERT \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
+		elog(DEBUG1, "UPDATE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
+		elog(DEBUG1, "DELETE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+
 	/* ... then check row filter */
 	if (list_length(relentry->row_filter) > 0)
 	{
@@ -356,6 +372,42 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		tupdesc = RelationGetDescr(relation);
 		estate = create_estate_for_relation(relation);
 
+#ifdef	_NOT_USED
+		if (old_tuple)
+		{
+			int i;
+
+			for (i = 0; i < tupdesc->natts; i++)
+			{
+				Form_pg_attribute	attr;
+				HeapTuple			type_tuple;
+				Oid					typoutput;
+				bool				typisvarlena;
+				bool				isnull;
+				Datum				val;
+				char				*outputstr = NULL;
+
+				attr = TupleDescAttr(tupdesc, i);
+
+				/* Figure out type name */
+				type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid));
+				if (HeapTupleIsValid(type_tuple))
+				{
+					/* Get information needed for printing values of a type */
+					getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
+
+					val = heap_getattr(old_tuple, i + 1, tupdesc, &isnull);
+					if (!isnull)
+					{
+						outputstr = OidOutputFunctionCall(typoutput, val);
+						elog(DEBUG2, "row filter: REPLICA IDENTITY %s: %s", NameStr(attr->attname), outputstr);
+						pfree(outputstr);
+					}
+				}
+			}
+		}
+#endif
+
 		/* prepare context per tuple */
 		ecxt = GetPerTupleExprContext(estate);
 		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -371,6 +423,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Oid			expr_type;
 			Datum		res;
 			bool		isnull;
+			char		*s = NULL;
 
 			row_filter = (Node *) lfirst(lc);
 
@@ -381,14 +434,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			expr_state = ExecInitExpr(expr, NULL);
 			res = ExecEvalExpr(expr_state, ecxt, &isnull);
 
+			elog(DEBUG3, "row filter: result: %s ; isnull: %s", (DatumGetBool(res)) ? "true" : "false", (isnull) ? "true" : "false");
+
 			/* if tuple does not match row filter, bail out */
 			if (!DatumGetBool(res) || isnull)
 			{
+				s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(row_filter)), ObjectIdGetDatum(relentry->relid)));
+				elog(DEBUG2, "row filter \"%s\" was not matched", s);
+				pfree(s);
+
 				MemoryContextSwitchTo(oldcxt);
 				ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
 				FreeExecutorState(estate);
 				return;
 			}
+
+			s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(row_filter)), ObjectIdGetDatum(relentry->relid)));
+			elog(DEBUG2, "row filter \"%s\" was matched", s);
+			pfree(s);
 		}
 
 		MemoryContextSwitchTo(oldcxt);
@@ -659,9 +722,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				{
 					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 					char	*s = TextDatumGetCString(rf_datum);
+					char	*t = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, rf_datum, ObjectIdGetDatum(entry->relid)));
 					Node	*rf_node = stringToNode(s);
 					entry->row_filter = lappend(entry->row_filter, rf_node);
 					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", t, pub->name, get_rel_name(relid));
 				}
 
 				ReleaseSysCache(rf_tuple);
-- 
2.7.4

Reply via email to