On Sun, Aug 29, 2021, at 11:14 PM, Peter Smith wrote: > Here are the new v26* patches. This is a refactoring of the row-filter > caches to remove all the logic from the get_rel_sync_entry function > and delay it until if/when needed in the pgoutput_row_filter function. > This is now implemented per Amit's suggestion to move all the cache > code [1]. It is a replacement for the v25* patches. > > The make check and TAP subscription tests are all OK. I have repeated > the performance tests [2] and those results are good too. > > v26-0001 <--- v23 (base RF patch) > v26-0002 <--- ExprState cache mods (refactored row filter caching) > v26-0002 <--- ExprState cache extra debug logging (temp) Peter, I'm still reviewing this new cache mechanism. I will provide a feedback as soon as I integrate it as part of this recent modification.
I'm attaching a new version that simply including Houzj review [1]. This is based on v23. There has been a discussion about which row should be used by row filter. We don't have a unanimous choice, so I think it is prudent to provide a way for the user to change it. I suggested in a previous email [2] that a publication option should be added. Hence, row filter can be applied to old tuple, new tuple, or both. This approach is simpler than using OLD/NEW references (less code and avoid validation such as NEW reference for DELETEs and OLD reference for INSERTs). I think about a reasonable default value and it seems _new_ tuple is a good one because (i) it is always available and (ii) user doesn't have to figure out that replication is broken due to a column that is not part of replica identity. I'm attaching a POC that implements it. I'm still polishing it. Add tests for multiple row filters and integrate Peter's caching mechanism [3] are the next steps. [1] https://www.postgresql.org/message-id/OS0PR01MB571696CA853B3655F7DE752994E29%40OS0PR01MB5716.jpnprd01.prod.outlook.com [2] https://www.postgresql.org/message-id/5a3f74df-ffa1-4126-a5d8-dbb081d3e439%40www.fastmail.com [3] https://www.postgresql.org/message-id/CAHut%2BPsgRHymwLhJ9t3By6%2BKNaVDzfjf6Y4Aq%3DJRD-y8t1mEFg%40mail.gmail.com -- Euler Taveira EDB https://www.enterprisedb.com/
From 018cdb79733ddf4f0de1e4eace3a172bd685d53c Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 18 Jan 2021 12:07:51 -0300 Subject: [PATCH v27 1/2] Row filter for logical replication This feature adds row filter for publication tables. When a publication is defined or modified, rows that don't satisfy an optional WHERE clause will be filtered out. This allows a database or set of tables to be partially replicated. The row filter is per table. A new row filter can be added simply by specifying a WHERE clause after the table name. The WHERE clause must be enclosed by parentheses. The WHERE clause should probably contain only columns that are part of the primary key or that are covered by REPLICA IDENTITY. Otherwise, any DELETEs won't be replicated. DELETE uses the old row version (that is limited to primary key or REPLICA IDENTITY) to evaluate the row filter. INSERT and UPDATE use the new row version to evaluate the row filter, hence, you can use any column. If the row filter evaluates to NULL, it returns false. For simplicity, functions are not allowed; it could possibly be addressed in a future patch. If you choose to do the initial table synchronization, only data that satisfies the row filters is sent. If the subscription has several publications in which a table has been published with different WHERE clauses, rows must satisfy all expressions to be copied. If subscriber is a pre-15 version, data synchronization won't use row filters if they are defined in the publisher. Previous versions cannot handle row filters. If your publication contains a partitioned table, the publication parameter publish_via_partition_root determines if it uses the partition row filter (if the parameter is false, the default) or the root partitioned table row filter. --- doc/src/sgml/catalogs.sgml | 8 + doc/src/sgml/ref/alter_publication.sgml | 11 +- doc/src/sgml/ref/create_publication.sgml | 33 ++- doc/src/sgml/ref/create_subscription.sgml | 10 +- src/backend/catalog/pg_publication.c | 47 ++- src/backend/commands/publicationcmds.c | 101 ++++--- src/backend/nodes/copyfuncs.c | 14 + src/backend/nodes/equalfuncs.c | 12 + src/backend/parser/gram.y | 24 +- src/backend/parser/parse_agg.c | 10 + src/backend/parser/parse_expr.c | 21 +- src/backend/parser/parse_func.c | 3 + src/backend/parser/parse_oper.c | 7 + src/backend/replication/logical/tablesync.c | 95 ++++++- src/backend/replication/pgoutput/pgoutput.c | 257 ++++++++++++++++- src/bin/pg_dump/pg_dump.c | 24 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 15 +- src/include/catalog/pg_publication.h | 9 +- src/include/catalog/pg_publication_rel.h | 6 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 11 +- src/include/parser/parse_node.h | 1 + src/test/regress/expected/publication.out | 71 +++++ src/test/regress/sql/publication.sql | 32 +++ src/test/subscription/t/025_row_filter.pl | 300 ++++++++++++++++++++ 26 files changed, 1048 insertions(+), 76 deletions(-) create mode 100644 src/test/subscription/t/025_row_filter.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 2b2c70a26e..d473af1b7b 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6231,6 +6231,14 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l Reference to relation </para></entry> </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>prqual</structfield> <type>pg_node_tree</type> + </para> + <para>Expression tree (in <function>nodeToString()</function> + representation) for the relation's qualifying condition</para></entry> + </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index faa114b2c6..4bb4314458 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -21,8 +21,8 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> -ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] -ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } @@ -92,7 +92,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r table name, only that table is affected. If <literal>ONLY</literal> is not specified, the table and all its descendant tables (if any) are affected. Optionally, <literal>*</literal> can be specified after the table - name to explicitly indicate that descendant tables are included. + name to explicitly indicate that descendant tables are included. If the + optional <literal>WHERE</literal> clause is specified, rows that do not + satisfy the <replaceable class="parameter">expression</replaceable> will + not be published. Note that parentheses are required around the + expression. The <replaceable class="parameter">expression</replaceable> + is evaluated with the role used for the replication connection. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index ff82fbca55..8f78fbbd90 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE PUBLICATION <replaceable class="parameter">name</replaceable> - [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] + [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] | FOR ALL TABLES ] [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> @@ -71,6 +71,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> This does not apply to a partitioned table, however. The partitions of a partitioned table are always implicitly considered part of the publication, so they are never explicitly added to the publication. + If the optional <literal>WHERE</literal> clause is specified, rows that do + not satisfy the <replaceable class="parameter">expression</replaceable> + will not be published. Note that parentheses are required around the + expression. It has no effect on <literal>TRUNCATE</literal> commands. </para> <para> @@ -182,6 +186,21 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> disallowed on those tables. </para> + <para> + The <literal>WHERE</literal> clause should contain only columns that are + part of the primary key or be covered by <literal>REPLICA + IDENTITY</literal> otherwise, <command>DELETE</command> operations will not + be replicated. That's because old row is used and it only contains primary + key or columns that are part of the <literal>REPLICA IDENTITY</literal>; the + remaining columns are <literal>NULL</literal>. For <command>INSERT</command> + and <command>UPDATE</command> operations, any column might be used in the + <literal>WHERE</literal> clause. New row is used and it contains all + columns. A <literal>NULL</literal> value causes the expression to evaluate + to false; avoid using columns without not-null constraints in the + <literal>WHERE</literal> clause. The <literal>WHERE</literal> clause does + not allow functions or user-defined operators. + </para> + <para> For an <command>INSERT ... ON CONFLICT</command> command, the publication will publish the operation that actually results from the command. So depending @@ -197,6 +216,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <para> <acronym>DDL</acronym> operations are not published. </para> + + <para> + The <literal>WHERE</literal> clause expression is executed with the role used + for the replication connection. + </para> </refsect1> <refsect1> @@ -209,6 +233,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments; </programlisting> </para> + <para> + Create a publication that publishes all changes from active departments: +<programlisting> +CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE); +</programlisting> + </para> + <para> Create a publication that publishes all changes in all tables: <programlisting> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 702934eba1..94e398133f 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -102,7 +102,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl <para> Specifies whether the existing data in the publications that are being subscribed to should be copied once the replication starts. - The default is <literal>true</literal>. + The default is <literal>true</literal>. If any table in the + publications has a <literal>WHERE</literal> clause, rows that do not + satisfy the <replaceable class="parameter">expression</replaceable> + will not be copied. If the subscription has several publications in + which a table has been published with different + <literal>WHERE</literal> clauses, rows must satisfy all expressions + to be copied. If the subscriber is a + <productname>PostgreSQL</productname> version before 15 then any row + filtering is ignored. </para> </listitem> </varlistentry> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 2a2fe03c13..6057fc3220 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -33,6 +33,9 @@ #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_relation.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -141,21 +144,27 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, Relation targetrel, +publication_add_relation(Oid pubid, PublicationRelationInfo *pri, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel); + Relation targetrel = pri->relation; + Oid relid; Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; + ParseState *pstate; + ParseNamespaceItem *nsitem; + Node *whereclause = NULL; rel = table_open(PublicationRelRelationId, RowExclusiveLock); + relid = RelationGetRelid(targetrel); + /* * Check for duplicates. Note that this does not really prevent * duplicates, it's here just to provide nicer error message in common @@ -177,6 +186,26 @@ publication_add_relation(Oid pubid, Relation targetrel, check_publication_add_relation(targetrel); + if (pri->whereClause != NULL) + { + /* Set up a pstate to parse with */ + pstate = make_parsestate(NULL); + pstate->p_sourcetext = nodeToString(pri->whereClause); + + nsitem = addRangeTableEntryForRelation(pstate, targetrel, + AccessShareLock, + NULL, false, false); + addNSItemToQuery(pstate, nsitem, false, true, true); + + whereclause = transformWhereClause(pstate, + copyObject(pri->whereClause), + EXPR_KIND_PUBLICATION_WHERE, + "PUBLICATION"); + + /* Fix up collation information */ + assign_expr_collations(pstate, whereclause); + } + /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -189,6 +218,12 @@ publication_add_relation(Oid pubid, Relation targetrel, values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + /* Add qualifications, if available */ + if (whereclause) + values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause)); + else + nulls[Anum_pg_publication_rel_prqual - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -205,6 +240,14 @@ publication_add_relation(Oid pubid, Relation targetrel, ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* Add dependency on the objects mentioned in the qualifications */ + if (whereclause) + { + recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL); + + free_parsestate(pstate); + } + /* Close the table. */ table_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 8487eeb7e6..0df7ffbe54 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -384,31 +384,24 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *delrels = NIL; ListCell *oldlc; - /* Calculate which relations to drop. */ + /* + * Remove all publication-table mappings. We could possibly remove (i) + * tables that are not found in the new table list and (ii) tables that + * are being re-added with a different qual expression. For (ii), + * simply updating the existing tuple is not enough, because of qual + * expression dependencies. + */ foreach(oldlc, oldrelids) { Oid oldrelid = lfirst_oid(oldlc); - ListCell *newlc; - bool found = false; + PublicationRelationInfo *oldrel; - foreach(newlc, rels) - { - Relation newrel = (Relation) lfirst(newlc); - - if (RelationGetRelid(newrel) == oldrelid) - { - found = true; - break; - } - } - - if (!found) - { - Relation oldrel = table_open(oldrelid, - ShareUpdateExclusiveLock); - - delrels = lappend(delrels, oldrel); - } + oldrel = palloc(sizeof(PublicationRelationInfo)); + oldrel->relid = oldrelid; + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrel->relid, + ShareUpdateExclusiveLock); + delrels = lappend(delrels, oldrel); } /* And drop them. */ @@ -498,7 +491,8 @@ RemovePublicationRelById(Oid proid) } /* - * Open relations specified by a RangeVar list. + * Open relations specified by a RangeVar list (PublicationTable or Relation). + * * The returned tables are locked in ShareUpdateExclusiveLock mode in order to * add them to a publication. */ @@ -508,16 +502,38 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + PublicationRelationInfo *pri; /* * Open, share-lock, and check all the explicitly-specified relations */ foreach(lc, tables) { - RangeVar *rv = lfirst_node(RangeVar, lc); - bool recurse = rv->inh; + PublicationTable *t = NULL; + RangeVar *rv; + bool recurse; Relation rel; Oid myrelid; + bool whereclause; + + /* + * ALTER PUBLICATION ... ADD TABLE provides a PublicationTable List + * (Relation, Where clause). ALTER PUBLICATION ... DROP TABLE provides + * a Relation List. Check the List element to be used. + */ + whereclause = IsA(lfirst(lc), PublicationTable); + + if (whereclause) + { + t = lfirst(lc); + rv = castNode(RangeVar, t->relation); + } + else + { + rv = lfirst_node(RangeVar, lc); + } + + recurse = rv->inh; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); @@ -537,8 +553,11 @@ OpenTableList(List *tables) table_close(rel, ShareUpdateExclusiveLock); continue; } - - rels = lappend(rels, rel); + pri = palloc(sizeof(PublicationRelationInfo)); + pri->relid = myrelid; + pri->relation = rel; + pri->whereClause = whereclause ? t->whereClause : NULL; + rels = lappend(rels, pri); relids = lappend_oid(relids, myrelid); /* @@ -571,7 +590,12 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - rels = lappend(rels, rel); + pri = palloc(sizeof(PublicationRelationInfo)); + pri->relid = childrelid; + pri->relation = rel; + /* child inherits WHERE clause from parent */ + pri->whereClause = whereclause ? t->whereClause : NULL; + rels = lappend(rels, pri); relids = lappend_oid(relids, childrelid); } } @@ -592,10 +616,12 @@ CloseTableList(List *rels) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pri = (PublicationRelationInfo *) lfirst(lc); - table_close(rel, NoLock); + table_close(pri->relation, NoLock); } + + list_free_deep(rels); } /* @@ -611,15 +637,15 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationInfo *pri = (PublicationRelationInfo *) lfirst(lc); ObjectAddress obj; /* Must be owner of the table or superuser. */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), - RelationGetRelationName(rel)); + if (!pg_class_ownercheck(pri->relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(pri->relation->rd_rel->relkind), + RelationGetRelationName(pri->relation)); - obj = publication_add_relation(pubid, rel, if_not_exists); + obj = publication_add_relation(pubid, pri, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -643,11 +669,10 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); - Oid relid = RelationGetRelid(rel); + PublicationRelationInfo *pri = (PublicationRelationInfo *) lfirst(lc); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, - ObjectIdGetDatum(relid), + ObjectIdGetDatum(pri->relid), ObjectIdGetDatum(pubid)); if (!OidIsValid(prid)) { @@ -657,7 +682,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" is not part of the publication", - RelationGetRelationName(rel)))); + RelationGetRelationName(pri->relation)))); } ObjectAddressSet(obj, PublicationRelRelationId, prid); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 38251c2b8e..291c01bd94 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4828,6 +4828,17 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) return newnode; } +static PublicationTable * +_copyPublicationTable(const PublicationTable *from) +{ + PublicationTable *newnode = makeNode(PublicationTable); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(whereClause); + + return newnode; +} + static CreateSubscriptionStmt * _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from) { @@ -5692,6 +5703,9 @@ copyObjectImpl(const void *from) case T_AlterPublicationStmt: retval = _copyAlterPublicationStmt(from); break; + case T_PublicationTable: + retval = _copyPublicationTable(from); + break; case T_CreateSubscriptionStmt: retval = _copyCreateSubscriptionStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 8a1762000c..3ec66c48af 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2314,6 +2314,15 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, return true; } +static bool +_equalPublicationTable(const PublicationTable *a, const PublicationTable *b) +{ + COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(whereClause); + + return true; +} + static bool _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a, const CreateSubscriptionStmt *b) @@ -3700,6 +3709,9 @@ equal(const void *a, const void *b) case T_AlterPublicationStmt: retval = _equalAlterPublicationStmt(a, b); break; + case T_PublicationTable: + retval = _equalPublicationTable(a, b); + break; case T_CreateSubscriptionStmt: retval = _equalCreateSubscriptionStmt(a, b); break; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..96c42d8aec 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list publication_table_list %type <node> opt_routine_body %type <groupclause> group_clause %type <list> group_by_list %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause -%type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_publication_for_tables publication_for_tables publication_table_elem %type <list> opt_fdw_options fdw_options %type <defelt> fdw_option @@ -9620,7 +9620,7 @@ opt_publication_for_tables: ; publication_for_tables: - FOR TABLE relation_expr_list + FOR TABLE publication_table_list { $$ = (Node *) $3; } @@ -9651,7 +9651,7 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE relation_expr_list + | ALTER PUBLICATION name ADD_P TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9659,7 +9659,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE relation_expr_list + | ALTER PUBLICATION name SET TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9677,6 +9677,20 @@ AlterPublicationStmt: } ; +publication_table_list: + publication_table_elem { $$ = list_make1($1); } + | publication_table_list ',' publication_table_elem { $$ = lappend($1, $3); } + ; + +publication_table_elem: relation_expr OptWhereClause + { + PublicationTable *n = makeNode(PublicationTable); + n->relation = $1; + n->whereClause = $2; + $$ = (Node *) n; + } + ; + /***************************************************************************** * * CREATE SUBSCRIPTION name ... diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 7d829a05a9..193c87d8b7 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -551,6 +551,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr) err = _("grouping operations are not allowed in COPY FROM WHERE conditions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + if (isAgg) + err = _("aggregate functions are not allowed in publication WHERE expressions"); + else + err = _("grouping operations are not allowed in publication WHERE expressions"); + + break; case EXPR_KIND_CYCLE_MARK: errkind = true; @@ -943,6 +950,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc, case EXPR_KIND_CYCLE_MARK: errkind = true; break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("window functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index f928c32311..321050660e 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -205,8 +205,19 @@ transformExprRecurse(ParseState *pstate, Node *expr) break; case T_FuncCall: - result = transformFuncCall(pstate, (FuncCall *) expr); - break; + { + /* + * Forbid functions in publication WHERE condition + */ + if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions are not allowed in publication WHERE expressions"), + parser_errposition(pstate, exprLocation(expr)))); + + result = transformFuncCall(pstate, (FuncCall *) expr); + break; + } case T_MultiAssignRef: result = transformMultiAssignRef(pstate, (MultiAssignRef *) expr); @@ -509,6 +520,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) case EXPR_KIND_COPY_WHERE: case EXPR_KIND_GENERATED_COLUMN: case EXPR_KIND_CYCLE_MARK: + case EXPR_KIND_PUBLICATION_WHERE: /* okay */ break; @@ -1769,6 +1781,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink) case EXPR_KIND_GENERATED_COLUMN: err = _("cannot use subquery in column generation expression"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("cannot use subquery in publication WHERE expression"); + break; /* * There is intentionally no default: case here, so that the @@ -3089,6 +3104,8 @@ ParseExprKindName(ParseExprKind exprKind) return "GENERATED AS"; case EXPR_KIND_CYCLE_MARK: return "CYCLE"; + case EXPR_KIND_PUBLICATION_WHERE: + return "publication expression"; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 3cec8de7da..e946f17c64 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -2655,6 +2655,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) case EXPR_KIND_CYCLE_MARK: errkind = true; break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("set-returning functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_oper.c b/src/backend/parser/parse_oper.c index bc34a23afc..29f8835ce1 100644 --- a/src/backend/parser/parse_oper.c +++ b/src/backend/parser/parse_oper.c @@ -718,6 +718,13 @@ make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, opform->oprright)), parser_errposition(pstate, location))); + /* Check it's not a custom operator for publication WHERE expressions */ + if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && opform->oid >= FirstNormalObjectId) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("user-defined operators are not allowed in publication WHERE expressions"), + parser_errposition(pstate, location))); + /* Do typecasting and build the expression tree */ if (ltree == NULL) { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..9d86a10594 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -688,19 +688,23 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in COPY command. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[] = {TEXTOID}; bool isnull; int natt; + ListCell *lc; + bool first; lrel->nspname = nspname; lrel->relname = relname; @@ -796,6 +800,59 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->natts = natt; walrcv_clear_result(res); + + /* + * Get relation qual. DISTINCT avoids the same expression of a table in + * multiple publications from being included multiple times in the final + * expression. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT pg_get_expr(prqual, prrelid) " + " FROM pg_publication p " + " INNER JOIN pg_publication_rel pr " + " ON (p.oid = pr.prpubid) " + " WHERE pr.prrelid = %u " + " AND p.pubname IN (", lrel->remoteid); + + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + pfree(cmd.data); } @@ -809,6 +866,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -817,7 +875,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -828,14 +886,18 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - if (lrel.relkind == RELKIND_RELATION) + + /* Regular table with no row filter */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL) appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); else { /* * For non-tables, we need to do COPY (SELECT ...), but we can't just - * do SELECT * because we need to not copy generated columns. + * do SELECT * because we need to not copy generated columns. For + * tables with any row filters, build a SELECT query with AND'ed row + * filters for COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) @@ -844,8 +906,29 @@ copy_table(Relation rel) if (i < lrel.natts - 1) appendStringInfoString(&cmd, ", "); } - appendStringInfo(&cmd, " FROM %s) TO STDOUT", + appendStringInfo(&cmd, " FROM %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* list of AND'ed filters */ + if (qual != NIL) + { + ListCell *lc; + bool first = true; + + appendStringInfoString(&cmd, " WHERE "); + foreach(lc, qual) + { + char *q = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, " AND "); + appendStringInfoString(&cmd, q); + } + list_free_deep(qual); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..1220203af7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -13,18 +13,27 @@ #include "postgres.h" #include "access/tupconvert.h" +#include "access/xact.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" #include "commands/defrem.h" +#include "executor/executor.h" #include "fmgr.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/optimizer.h" +#include "parser/parse_coerce.h" #include "replication/logical.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -114,6 +123,8 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; + List *exprstate; /* ExprState for row filter */ + TupleTableSlot *scantuple; /* tuple table slot for row filter */ /* * OID of the relation to publish changes as. For a partition, this may @@ -137,7 +148,7 @@ static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -146,6 +157,13 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); +/* row filter routines */ +static EState *create_estate_for_relation(Relation rel); +static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); +static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, RelationSyncEntry *entry); + /* * Specify output plugin callbacks */ @@ -620,6 +638,149 @@ send_relation_and_attrs(Relation relation, TransactionId xid, OutputPluginWrite(ctx, false); } +/* + * Executor state preparation for evaluation of row filter expressions for the + * specified relation. + */ +static EState * +create_estate_for_relation(Relation rel) +{ + EState *estate; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + estate->es_output_cid = GetCurrentCommandId(false); + + return estate; +} + +/* + * Initialize for row filter expression execution. + */ +static ExprState * +pgoutput_row_filter_init_expr(Node *rfnode) +{ + ExprState *exprstate; + Oid exprtype; + Expr *expr; + MemoryContext oldctx; + + /* Prepare expression for execution */ + exprtype = exprType(rfnode); + expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1); + + if (expr == NULL) + ereport(ERROR, + (errcode(ERRCODE_CANNOT_COERCE), + errmsg("row filter returns type %s that cannot be coerced to the expected type %s", + format_type_be(exprtype), + format_type_be(BOOLOID)), + errhint("You will need to rewrite the row filter."))); + + /* + * Cache ExprState using CacheMemoryContext. This is the same code as + * ExecPrepareExpr() but that is not used because it doesn't use an EState. + * It should probably be another function in the executor to handle the + * execution outside a normal Plan tree context. + */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + expr = expression_planner(expr); + exprstate = ExecInitExpr(expr, NULL); + MemoryContextSwitchTo(oldctx); + + return exprstate; +} + +/* + * Evaluates row filter. + * + * If the row filter evaluates to NULL, it is taken as false i.e. the change + * isn't replicated. + */ +static bool +pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) +{ + Datum ret; + bool isnull; + + Assert(state != NULL); + + ret = ExecEvalExprSwitchContext(state, econtext, &isnull); + + elog(DEBUG3, "row filter evaluates to %s (isnull: %s)", + DatumGetBool(ret) ? "true" : "false", + isnull ? "true" : "false"); + + if (isnull) + return false; + + return DatumGetBool(ret); +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + ListCell *lc; + bool result = true; + + /* Bail out if there is no row filter */ + if (entry->exprstate == NIL) + return true; + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + get_rel_name(relation->rd_id)); + + PushActiveSnapshot(GetTransactionSnapshot()); + + estate = create_estate_for_relation(relation); + + /* Prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + ecxt->ecxt_scantuple = entry->scantuple; + + ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); + + /* + * If the subscription has multiple publications and the same table has a + * different row filter in these publications, all row filters must be + * matched in order to replicate this change. + */ + foreach(lc, entry->exprstate) + { + ExprState *exprstate = (ExprState *) lfirst(lc); + + /* Evaluates row filter */ + result = pgoutput_row_filter_exec_expr(exprstate, ecxt); + + /* If the tuple does not match one of the row filters, bail out */ + if (!result) + break; + } + + /* Cleanup allocated resources */ + ResetExprContext(ecxt); + FreeExecutorState(estate); + PopActiveSnapshot(); + + return result; +} + /* * Sends the decoded DML over wire. * @@ -647,7 +808,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = change->txn->xid; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ switch (change->action) @@ -671,8 +832,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, change, relation, relentry); - /* Send the data */ switch (change->action) { @@ -680,6 +839,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { HeapTuple tuple = &change->data.tp.newtuple->tuple; + /* Check row filter. */ + if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) + break; + + /* + * Schema should be sent before the logic that replaces the + * relation because it also sends the ancestor's relation. + */ + maybe_send_schema(ctx, change, relation, relentry); + /* Switch relation if publishing via root. */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { @@ -703,6 +872,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &change->data.tp.oldtuple->tuple : NULL; HeapTuple newtuple = &change->data.tp.newtuple->tuple; + /* Check row filter. */ + if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry)) + break; + + maybe_send_schema(ctx, change, relation, relentry); + /* Switch relation if publishing via root. */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { @@ -731,6 +906,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + /* Check row filter. */ + if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) + break; + + maybe_send_schema(ctx, change, relation, relentry); + /* Switch relation if publishing via root. */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { @@ -794,7 +975,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -1113,9 +1294,10 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) * when publishing. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation relation) { RelationSyncEntry *entry; + Oid relid = RelationGetRelid(relation); bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); bool found; @@ -1138,6 +1320,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->replicate_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->scantuple = NULL; + entry->exprstate = NIL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1149,6 +1333,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) List *pubids = GetRelationPublications(relid); ListCell *lc; Oid publish_as_relid = relid; + TupleDesc tupdesc = RelationGetDescr(relation); /* Reload publications if needed before use. */ if (!publications_valid) @@ -1162,6 +1347,22 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) publications_valid = true; } + /* Release tuple table slot */ + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + + /* + * Create a tuple table slot for row filter. TupleDesc must live as + * long as the cache remains. + */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + tupdesc = CreateTupleDescCopy(tupdesc); + entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + MemoryContextSwitchTo(oldctx); + /* * Build publication cache. We can't use one provided by relcache as * relcache considers all publications given relation is in, but here @@ -1171,6 +1372,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + HeapTuple rftuple; + Datum rfdatum; + bool rfisnull; if (pub->alltables) { @@ -1230,9 +1434,33 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + /* + * Cache row filter, if available. All publication-table mappings + * must be checked. If it is a partition and pubviaroot is true, + * use the row filter of the topmost partitioned table instead of + * the row filter of its own partition. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) + { + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); + + if (!rfisnull) + { + Node *rfnode; + ExprState *exprstate; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + + /* Prepare for expression execution */ + exprstate = pgoutput_row_filter_init_expr(rfnode); + entry->exprstate = lappend(entry->exprstate, exprstate); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(rftuple); + } } list_free(pubids); @@ -1350,6 +1578,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) { HASH_SEQ_STATUS status; RelationSyncEntry *entry; + MemoryContext oldctx; /* * We can get here if the plugin was used in SQL interface as the @@ -1359,6 +1588,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) if (RelationSyncCache == NULL) return; + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + /* * There is no way to find which entry in our cache the hash belongs to so * mark the whole cache as invalid. @@ -1376,7 +1607,15 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + + if (entry->exprstate != NIL) + { + list_free_deep(entry->exprstate); + entry->exprstate = NIL; + } } + + MemoryContextSwitchTo(oldctx); } /* Send Replication origin */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 67be849829..1f19ae4384 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4140,6 +4140,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_oid; int i_prpubid; int i_prrelid; + int i_prrelqual; int i, j, ntups; @@ -4150,9 +4151,16 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) query = createPQExpBuffer(); /* Collect all publication membership info. */ - appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid " - "FROM pg_catalog.pg_publication_rel"); + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, + "SELECT tableoid, oid, prpubid, prrelid, " + "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual " + "FROM pg_catalog.pg_publication_rel"); + else + appendPQExpBufferStr(query, + "SELECT tableoid, oid, prpubid, prrelid, " + "NULL AS prrelqual " + "FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); @@ -4161,6 +4169,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_oid = PQfnumber(res, "oid"); i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); + i_prrelqual = PQfnumber(res, "prrelqual"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4201,6 +4210,10 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].dobj.name = tbinfo->dobj.name; pubrinfo[j].publication = pubinfo; pubrinfo[j].pubtable = tbinfo; + if (PQgetisnull(res, i, i_prrelqual)) + pubrinfo[j].pubrelqual = NULL; + else + pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual)); /* Decide whether we want to dump it */ selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout); @@ -4233,8 +4246,11 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", fmtId(pubinfo->dobj.name)); - appendPQExpBuffer(query, " %s;\n", + appendPQExpBuffer(query, " %s", fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrelqual) + appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual); + appendPQExpBufferStr(query, ";\n"); /* * There is no point in creating a drop query as the drop is done by table diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 29af845ece..f932a704eb 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -629,6 +629,7 @@ typedef struct _PublicationRelInfo DumpableObject dobj; PublicationInfo *publication; TableInfo *pubtable; + char *pubrelqual; } PublicationRelInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 90ff649be7..5f6418a572 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6329,8 +6329,15 @@ describePublications(const char *pattern) if (!puballtables) { printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname\n" - "FROM pg_catalog.pg_class c,\n" + "SELECT n.nspname, c.relname"); + if (pset.sversion >= 150000) + appendPQExpBufferStr(&buf, + ", pg_get_expr(pr.prqual, c.oid)"); + else + appendPQExpBufferStr(&buf, + ", NULL"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" @@ -6359,6 +6366,10 @@ describePublications(const char *pattern) PQgetvalue(tabres, j, 0), PQgetvalue(tabres, j, 1)); + if (!PQgetisnull(tabres, j, 2)) + appendPQExpBuffer(&buf, " WHERE (%s)", + PQgetvalue(tabres, j, 2)); + printTableAddFooter(&cont, buf.data); } PQclear(tabres); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..2703b9c3fe 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -83,6 +83,13 @@ typedef struct Publication PublicationActions pubactions; } Publication; +typedef struct PublicationRelationInfo +{ + Oid relid; + Relation relation; + Node *whereClause; +} PublicationRelationInfo; + extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); @@ -108,7 +115,7 @@ extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationInfo *pri, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index b5d5504cbb..154bb61777 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + pg_node_tree prqual; /* qualifications */ +#endif } FormData_pg_publication_rel; /* ---------------- @@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) */ typedef FormData_pg_publication_rel *Form_pg_publication_rel; +DECLARE_TOAST(pg_publication_rel, 8287, 8288); + DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops)); DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops)); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..56d13ff022 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -490,6 +490,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 7af13dee43..875b809099 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3625,12 +3625,19 @@ typedef struct AlterTSConfigurationStmt } AlterTSConfigurationStmt; +typedef struct PublicationTable +{ + NodeTag type; + RangeVar *relation; /* relation to be published */ + Node *whereClause; /* qualifications */ +} PublicationTable; + typedef struct CreatePublicationStmt { NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *tables; /* Optional list of PublicationTable to add */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3643,7 +3650,7 @@ typedef struct AlterPublicationStmt List *options; /* List of DefElem nodes */ /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + List *tables; /* List of PublicationTable to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ } AlterPublicationStmt; diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 1500de2dd0..4537543a7b 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -80,6 +80,7 @@ typedef enum ParseExprKind EXPR_KIND_COPY_WHERE, /* WHERE condition in COPY FROM */ EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */ EXPR_KIND_CYCLE_MARK, /* cycle mark value */ + EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */ } ParseExprKind; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 4a5ef0bc24..319c6bc7d9 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -158,6 +158,77 @@ Tables: DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_rf_tbl1" + "public.testpub_rf_tbl2" WHERE (((c <> 'test'::text) AND (d < 5))) + +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_rf_tbl1" + "public.testpub_rf_tbl2" WHERE (((c <> 'test'::text) AND (d < 5))) + "public.testpub_rf_tbl3" WHERE (((e > 1000) AND (e < 2000))) + +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_rf_tbl1" + "public.testpub_rf_tbl3" WHERE (((e > 1000) AND (e < 2000))) + +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_rf_tbl3" WHERE (((e > 300) AND (e < 500))) + +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +ERROR: functions are not allowed in publication WHERE expressions +LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ... + ^ +-- fail - user-defined operators disallowed +CREATE FUNCTION testpub_rf_func(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL; +CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func, LEFTARG = integer, RIGHTARG = integer); +CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27); +ERROR: user-defined operators are not allowed in publication WHERE expressions +LINE 1: ...ICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27); + ^ +-- fail - WHERE not allowed in DROP +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27); +ERROR: syntax error at or near "WHERE" +LINE 1: ...R PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e <... + ^ +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; +DROP OPERATOR =#>(integer, integer); +DROP FUNCTION testpub_rf_func(integer, integer); -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: cannot add relation "testpub_view" to publication diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index d844075368..b1606cce7e 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -93,6 +93,38 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +\dRp+ testpub5 +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +\dRp+ testpub5 +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +\dRp+ testpub5 +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +\dRp+ testpub5 +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +-- fail - user-defined operators disallowed +CREATE FUNCTION testpub_rf_func(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL; +CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func, LEFTARG = integer, RIGHTARG = integer); +CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27); +-- fail - WHERE not allowed in DROP +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27); + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; +DROP OPERATOR =#>(integer, integer); +DROP FUNCTION testpub_rf_func(integer, integer); + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; diff --git a/src/test/subscription/t/025_row_filter.pl b/src/test/subscription/t/025_row_filter.pl new file mode 100644 index 0000000000..6428f0da00 --- /dev/null +++ b/src/test/subscription/t/025_row_filter.pl @@ -0,0 +1,300 @@ +# Test logical replication behavior with row filtering +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# create publisher node +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)" +); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)" +); + +# setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)" +); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)" +); + +# setup logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')" +); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)" +); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_rowfilter_partitioned WHERE (a < 5000)" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)" +); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_not_used FOR TABLE tab_rowfilter_1 WHERE (a < 0)" +); + +# +# The following INSERTs are executed before the CREATE SUBSCRIPTION, so these +# SQL commands are for testing the initial data copy using logical replication. +# +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x" +); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"); + +# insert data into partitioned table and directly on the partition +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(7000, 101),(15000, 102),(5500, 300)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(2, 200),(6005, 201)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 103)"); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3" +); + +$node_publisher->wait_for_catchup($appname); + +# wait for initial table synchronization to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check expected replicated rows for tab_rowfilter_1 +# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered') +# - INSERT (1, 'not replicated') NO, because a is not > 1000 +# - INSERT (1500, 'filtered') NO, because b == 'filtered' +# - INSERT (1980, 'not filtered') YES +# - generate_series(990,1002) YES, only for 1001,1002 because a > 1000 +# +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); +is( $result, qq(1001|test 1001 +1002|test 1002 +1980|not filtered), 'check initial data copy from table tab_rowfilter_1'); + +# Check expected replicated rows for tab_rowfilter_2 +# tap_pub_1 filter is: (c % 2 = 0) +# tap_pub_2 filter is: (c % 3 = 0) +# When there are multiple publications for the same table, all filter +# expressions should succeed. In this case, rows are replicated if c value is +# divided by 2 AND 3 (6, 12, 18). +# +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2"); +is($result, qq(3|6|18), 'check initial data copy from table tab_rowfilter_2'); + +# Check expected replicated rows for tab_rowfilter_3 +# There is no filter. 10 rows are inserted, so 10 rows are replicated. +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM tab_rowfilter_3"); +is($result, qq(10), 'check initial data copy from table tab_rowfilter_3'); + +# Check expected replicated rows for partitions +# publication option publish_via_partition_root is false so use the row filter +# from a partition +# tab_rowfilter_partitioned filter: (a < 5000) +# tab_rowfilter_less_10k filter: (a < 6000) +# tab_rowfilter_greater_10k filter: no filter +# +# INSERT into tab_rowfilter_partitioned: +# - INSERT (1,100) YES, because 1 < 6000 +# - INSERT (7000, 101) NO, because 7000 is not < 6000 +# - INSERT (15000, 102) YES, because tab_rowfilter_greater_10k has no filter +# - INSERT (5500, 300) YES, because 5500 < 6000 +# +# INSERT directly into tab_rowfilter_less_10k: +# - INSERT (2, 200) YES, because 2 < 6000 +# - INSERT (6005, 201) NO, because 6005 is not < 6000 +# +# INSERT directly into tab_rowfilter_greater_10k: +# - INSERT (16000, 103) YES, because tab_rowfilter_greater_10k has no filter +# +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_less_10k ORDER BY 1, 2"); +is($result, qq(1|100 +2|200 +5500|300), 'check initial data copy from partition tab_rowfilter_less_10k'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_greater_10k ORDER BY 1, 2"); +is($result, qq(15000|102 +16000|103), 'check initial data copy from partition tab_rowfilter_greater_10k'); + +# The following commands are executed after CREATE SUBSCRIPTION, so these SQL +# commands are for testing normal logical replication behavior. +# +# test row filter (INSERT, UPDATE, DELETE) +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1600, 'test 1600')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1601, 'test 1601')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1700, 'test 1700')"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_1 SET b = NULL WHERE a = 1600"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_rowfilter_1 WHERE a = 1700"); + +$node_publisher->wait_for_catchup($appname); + +# Check expected replicated rows for tab_rowfilter_1 +# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered') +# +# - 1001, 1002, 1980 already exist from initial data copy +# - INSERT (800, 'test 800') NO, because 800 is not > 1000 +# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered' +# - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered' +# - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered' +# - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered' +# - UPDATE (1601, 'test 1601 updated') YES, because 1601 > 1000 and 'test 1601 updated' <> 'filtered' +# - DELETE (1700) NO, row filter contains column b that is not part of +# the PK or REPLICA IDENTITY and old tuple contains b = NULL, hence, row filter +# evaluates to false +# +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); +is($result, qq(1001|test 1001 +1002|test 1002 +1600|test 1600 +1601|test 1601 updated +1700|test 1700 +1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); + +# Publish using root partitioned table +# Use a different partitioned table layout (exercise publish_via_partition_root) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_3 SET (publish_via_partition_root = true)"); +$node_subscriber->safe_psql('postgres', + "TRUNCATE TABLE tab_rowfilter_partitioned"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(4500, 450)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(5600, 123)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(14000, 1950)"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_less_10k SET b = 30 WHERE a = 4001"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_rowfilter_less_10k WHERE a = 4002"); + +$node_publisher->wait_for_catchup($appname); + +# Check expected replicated rows for partitions +# publication option publish_via_partition_root is true so use the row filter +# from the root partitioned table +# tab_rowfilter_partitioned filter: (a < 5000) +# tab_rowfilter_less_10k filter: (a < 6000) +# tab_rowfilter_greater_10k filter: no filter +# +# After TRUNCATE, REFRESH PUBLICATION, the initial data copy will apply the +# partitioned table row filter. +# - INSERT (1, 100) YES, 1 < 5000 +# - INSERT (7000, 101) NO, 7000 is not < 5000 +# - INSERT (15000, 102) NO, 15000 is not < 5000 +# - INSERT (5500, 300) NO, 5500 is not < 5000 +# - INSERT (2, 200) YES, 2 < 5000 +# - INSERT (6005, 201) NO, 6005 is not < 5000 +# - INSERT (16000, 103) NO, 16000 is not < 5000 +# +# Execute SQL commands after initial data copy for testing the logical +# replication behavior. +# - INSERT (4000, 400) YES, 4000 < 5000 +# - INSERT (4001, 401) YES, 4001 < 5000 +# - INSERT (4002, 402) YES, 4002 < 5000 +# - INSERT (4500, 450) YES, 4500 < 5000 +# - INSERT (5600, 123) NO, 5600 is not < 5000 +# - INSERT (14000, 1950) NO, 16000 is not < 5000 +# - UPDATE (4001) YES, 4001 < 5000 +# - DELETE (4002) YES, 4002 < 5000 +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_partitioned ORDER BY 1, 2"); +is( $result, qq(1|100 +2|200 +4000|400 +4001|30 +4500|450), 'check publish_via_partition_root behavior'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.20.1
From f00cfd18a7d10254a0539213e38129ed56823869 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Wed, 18 Aug 2021 19:32:23 -0300 Subject: [PATCH v27 2/2] publication parameter: row_filter_on_update This parameter controls which tuple to be used to evaluate the expression provided by the WHERE clause on UPDATE operations. The allowed values are new and old. The default is new. This patch introduces a new List per entry that contains row filter data. Hence, it might have two row filters with different row_filter_on_update to test old and new tuples. --- doc/src/sgml/ref/create_publication.sgml | 12 ++ src/backend/catalog/pg_publication.c | 1 + src/backend/commands/publicationcmds.c | 44 ++++++- src/backend/replication/pgoutput/pgoutput.c | 132 ++++++++++++++------ src/include/catalog/pg_publication.h | 11 ++ src/test/regress/expected/publication.out | 3 + src/test/regress/sql/publication.sql | 2 + src/test/subscription/t/025_row_filter.pl | 65 +++++++++- 8 files changed, 224 insertions(+), 46 deletions(-) diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 8f78fbbd90..ba1eac08ce 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -146,6 +146,18 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>row_filter_on_update</literal> (<type>string</type>)</term> + <listitem> + <para> + This parameter controls which tuple to be used to evaluate the + expression for <literal>UPDATE</literal> operations. The allowed + values are <literal>new</literal> and <literal>old</literal>. The + default is to use the new tuple. + </para> + </listitem> + </varlistentry> </variablelist></para> </listitem> </varlistentry> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 6057fc3220..77c4f83c7f 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -474,6 +474,7 @@ GetPublication(Oid pubid) pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; + pub->pubrowfilterupd = pubform->pubrowfilterupd; ReleaseSysCache(tup); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 0df7ffbe54..00373db654 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -60,12 +60,15 @@ parse_publication_options(ParseState *pstate, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, - bool *publish_via_partition_root) + bool *publish_via_partition_root, + bool *row_filter_on_update_given, + char *row_filter_on_update) { ListCell *lc; *publish_given = false; *publish_via_partition_root_given = false; + *row_filter_on_update_given = false; /* defaults */ pubactions->pubinsert = true; @@ -73,6 +76,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; pubactions->pubtruncate = true; *publish_via_partition_root = false; + *row_filter_on_update = PUB_ROW_FILTER_UPD_NEW_TUPLE; /* Parse options */ foreach(lc, options) @@ -131,6 +135,24 @@ parse_publication_options(ParseState *pstate, *publish_via_partition_root_given = true; *publish_via_partition_root = defGetBoolean(defel); } + else if (strcmp(defel->defname, "row_filter_on_update") == 0) + { + char *rowfilterupd; + + if (*row_filter_on_update_given) + errorConflictingDefElem(defel, pstate); + *row_filter_on_update_given = true; + rowfilterupd = defGetString(defel); + + if (strcmp(rowfilterupd, "new") == 0) + *row_filter_on_update = PUB_ROW_FILTER_UPD_NEW_TUPLE; + else if (strcmp(rowfilterupd, "old") == 0) + *row_filter_on_update = PUB_ROW_FILTER_UPD_OLD_TUPLE; + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized \"row_filter_on_update\" value: \"%s\"", rowfilterupd))); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -154,6 +176,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) PublicationActions pubactions; bool publish_via_partition_root_given; bool publish_via_partition_root; + bool row_filter_on_update_given; + char row_filter_on_update; AclResult aclresult; /* must have CREATE privilege on database */ @@ -193,7 +217,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) stmt->options, &publish_given, &pubactions, &publish_via_partition_root_given, - &publish_via_partition_root); + &publish_via_partition_root, + &row_filter_on_update_given, + &row_filter_on_update); puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId, Anum_pg_publication_oid); @@ -210,6 +236,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubtruncate); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + values[Anum_pg_publication_pubrowfilterupd - 1] = + CharGetDatum(row_filter_on_update); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -264,6 +292,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, PublicationActions pubactions; bool publish_via_partition_root_given; bool publish_via_partition_root; + bool row_filter_on_update_given; + char row_filter_on_update; ObjectAddress obj; Form_pg_publication pubform; @@ -271,7 +301,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, stmt->options, &publish_given, &pubactions, &publish_via_partition_root_given, - &publish_via_partition_root); + &publish_via_partition_root, + &row_filter_on_update_given, + &row_filter_on_update); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -299,6 +331,12 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, replaces[Anum_pg_publication_pubviaroot - 1] = true; } + if (row_filter_on_update_given) + { + values[Anum_pg_publication_pubrowfilterupd - 1] = CharGetDatum(row_filter_on_update); + replaces[Anum_pg_publication_pubrowfilterupd - 1] = true; + } + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 1220203af7..5313cd0b78 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -95,6 +95,19 @@ static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); +/* + * One relation can have multiple row filters. This structure has data for each + * row filter including an ExprState and TupleTableSlot for cache purposes and + * also a variable that indicates which tuple the row filter uses for UPDATE + * actions. + */ +typedef struct RowFilterState +{ + ExprState *exprstate; + TupleTableSlot *scantuple; + char row_filter_on_update; +} RowFilterState; + /* * Entry in the map used to remember which relation schemas we sent. * @@ -123,8 +136,7 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; - List *exprstate; /* ExprState for row filter */ - TupleTableSlot *scantuple; /* tuple table slot for row filter */ + List *rfstate; /* row filter list */ /* * OID of the relation to publish changes as. For a partition, this may @@ -161,7 +173,7 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, static EState *create_estate_for_relation(Relation rel); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(Relation relation, int action, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); /* @@ -731,15 +743,14 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) * If it returns true, the change is replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter(Relation relation, int action, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) { EState *estate; - ExprContext *ecxt; ListCell *lc; bool result = true; /* Bail out if there is no row filter */ - if (entry->exprstate == NIL) + if (entry->rfstate == NIL) return true; elog(DEBUG3, "table \"%s.%s\" has row filter", @@ -750,31 +761,62 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R estate = create_estate_for_relation(relation); - /* Prepare context per tuple */ - ecxt = GetPerTupleExprContext(estate); - ecxt->ecxt_scantuple = entry->scantuple; - - ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); - /* * If the subscription has multiple publications and the same table has a * different row filter in these publications, all row filters must be * matched in order to replicate this change. */ - foreach(lc, entry->exprstate) + foreach(lc, entry->rfstate) { - ExprState *exprstate = (ExprState *) lfirst(lc); + ExprContext *ecxt; + RowFilterState *rfstate = (RowFilterState *) lfirst(lc); + + /* Bail out if row_filter_on_update = old and old tuple is NULL */ + if (action == REORDER_BUFFER_CHANGE_UPDATE && oldtuple == NULL && + rfstate->row_filter_on_update == PUB_ROW_FILTER_UPD_OLD_TUPLE) + return false; + + /* Prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + ecxt->ecxt_scantuple = rfstate->scantuple; + + /* + * Choose which tuple to use for row filter. + * - INSERT: uses new tuple. + * - UPDATE: it can use new tuple or old tuple. The behavior is controlled + * by the publication parameter row_filter_on_update. The default is new + * tuple. + * - DELETE: uses old tuple. + */ + switch (action) + { + case REORDER_BUFFER_CHANGE_INSERT: + ExecStoreHeapTuple(newtuple, ecxt->ecxt_scantuple, false); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + if (rfstate->row_filter_on_update == PUB_ROW_FILTER_UPD_NEW_TUPLE) + ExecStoreHeapTuple(newtuple, ecxt->ecxt_scantuple, false); + else if (rfstate->row_filter_on_update == PUB_ROW_FILTER_UPD_OLD_TUPLE) + ExecStoreHeapTuple(oldtuple, ecxt->ecxt_scantuple, false); + else + Assert(false); + break; + case REORDER_BUFFER_CHANGE_DELETE: + ExecStoreHeapTuple(oldtuple, ecxt->ecxt_scantuple, false); + break; + } /* Evaluates row filter */ - result = pgoutput_row_filter_exec_expr(exprstate, ecxt); + result = pgoutput_row_filter_exec_expr(rfstate->exprstate, ecxt); /* If the tuple does not match one of the row filters, bail out */ if (!result) break; + + ResetExprContext(ecxt); } /* Cleanup allocated resources */ - ResetExprContext(ecxt); FreeExecutorState(estate); PopActiveSnapshot(); @@ -840,7 +882,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(relation, change->action, NULL, tuple, relentry)) break; /* @@ -873,7 +915,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple newtuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter(relation, change->action, oldtuple, newtuple, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -907,7 +949,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(relation, change->action, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1320,8 +1362,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->replicate_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; - entry->scantuple = NULL; - entry->exprstate = NIL; + entry->rfstate = NIL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1347,21 +1388,26 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } - /* Release tuple table slot */ - if (entry->scantuple != NULL) + foreach(lc, entry->rfstate) { - ExecDropSingleTupleTableSlot(entry->scantuple); - entry->scantuple = NULL; - } + RowFilterState *rfstate = (RowFilterState *) lfirst(lc); - /* - * Create a tuple table slot for row filter. TupleDesc must live as - * long as the cache remains. - */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - tupdesc = CreateTupleDescCopy(tupdesc); - entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); - MemoryContextSwitchTo(oldctx); + /* Release tuple table slot */ + if (rfstate->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(rfstate->scantuple); + rfstate->scantuple = NULL; + } + + /* + * Create a tuple table slot for row filter. TupleDesc must live as + * long as the cache remains. + */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + tupdesc = CreateTupleDescCopy(tupdesc); + rfstate->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + MemoryContextSwitchTo(oldctx); + } /* * Build publication cache. We can't use one provided by relcache as @@ -1447,15 +1493,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (!rfisnull) { - Node *rfnode; - ExprState *exprstate; + Node *rfnode; + RowFilterState *rfstate; oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfstate = palloc0(sizeof(RowFilterState)); rfnode = stringToNode(TextDatumGetCString(rfdatum)); /* Prepare for expression execution */ - exprstate = pgoutput_row_filter_init_expr(rfnode); - entry->exprstate = lappend(entry->exprstate, exprstate); + rfstate->exprstate = pgoutput_row_filter_init_expr(rfnode); + rfstate->row_filter_on_update = pub->pubrowfilterupd; + tupdesc = CreateTupleDescCopy(tupdesc); + rfstate->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + entry->rfstate = lappend(entry->rfstate, rfstate); MemoryContextSwitchTo(oldctx); } @@ -1608,10 +1658,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - if (entry->exprstate != NIL) + if (entry->rfstate != NIL) { - list_free_deep(entry->exprstate); - entry->exprstate = NIL; + list_free_deep(entry->rfstate); + entry->rfstate = NIL; } } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 2703b9c3fe..e68591c1fd 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -54,6 +54,12 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if partition changes are published using root schema */ bool pubviaroot; + + /* + * Choose which tuple to use for row filter on UPDATE actions. + * See constants below. + */ + char pubrowfilterupd; } FormData_pg_publication; /* ---------------- @@ -81,8 +87,13 @@ typedef struct Publication bool alltables; bool pubviaroot; PublicationActions pubactions; + char pubrowfilterupd; } Publication; +/* pubrowfilterupd values */ +#define PUB_ROW_FILTER_UPD_NEW_TUPLE 'n' /* use new tuple */ +#define PUB_ROW_FILTER_UPD_OLD_TUPLE 'o' /* use old tuple */ + typedef struct PublicationRelationInfo { Oid relid; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 319c6bc7d9..1a4ba78033 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -29,6 +29,8 @@ CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publis ERROR: conflicting or redundant options LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi... ^ +CREATE PUBLICATION testpub_xxx WITH (row_filter_on_update = 'foo'); +ERROR: unrecognized "row_filter_on_update" value: "foo" \dRp List of publications Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root @@ -205,6 +207,7 @@ ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500) Tables: "public.testpub_rf_tbl3" WHERE (((e > 300) AND (e < 500))) +ALTER PUBLICATION testpub5 SET (row_filter_on_update = 'new'); -- fail - functions disallowed ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); ERROR: functions are not allowed in publication WHERE expressions diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index b1606cce7e..352d4482b1 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -24,6 +24,7 @@ ALTER PUBLICATION testpub_default SET (publish = update); CREATE PUBLICATION testpub_xxx WITH (foo); CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0'); +CREATE PUBLICATION testpub_xxx WITH (row_filter_on_update = 'foo'); \dRp @@ -108,6 +109,7 @@ ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); \dRp+ testpub5 +ALTER PUBLICATION testpub5 SET (row_filter_on_update = 'new'); -- fail - functions disallowed ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); -- fail - user-defined operators disallowed diff --git a/src/test/subscription/t/025_row_filter.pl b/src/test/subscription/t/025_row_filter.pl index 6428f0da00..3ad5de024c 100644 --- a/src/test/subscription/t/025_row_filter.pl +++ b/src/test/subscription/t/025_row_filter.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 10; # create publisher node my $node_publisher = PostgresNode->new('publisher'); @@ -22,6 +22,8 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab_rowfilter_2 (c int primary key)"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_4 (a int, b integer, primary key(a, b))"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)" ); @@ -44,6 +46,8 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rowfilter_2 (c int primary key)"); $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_4 (a int, b integer, primary key(a, b))"); $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)" ); @@ -82,6 +86,9 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)" ); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_4 FOR TABLE tab_rowfilter_4 WHERE (a < 10 AND b < 40)" +); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub_not_used FOR TABLE tab_rowfilter_1 WHERE (a < 0)" ); @@ -103,6 +110,8 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_4 (a, b) SELECT i, i + 30 FROM generate_series(1, 6) i"); # insert data into partitioned table and directly on the partition $node_publisher->safe_psql('postgres', @@ -115,7 +124,7 @@ $node_publisher->safe_psql('postgres', my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4" ); $node_publisher->wait_for_catchup($appname); @@ -159,6 +168,13 @@ $result = "SELECT count(a) FROM tab_rowfilter_3"); is($result, qq(10), 'check initial data copy from table tab_rowfilter_3'); +# Check expected replicated rows for tab_rowfilter_4 +# tap_pub_4 filter is: (a < 10 AND b < 40) +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(a), min(a), max(a), min(b), max(b) FROM tab_rowfilter_4"); +is($result, qq(6|1|6|31|36), 'check initial data copy from table tab_rowfilter_4'); + # Check expected replicated rows for partitions # publication option publish_via_partition_root is false so use the row filter # from a partition @@ -210,6 +226,11 @@ $node_publisher->safe_psql('postgres', "UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_rowfilter_1 WHERE a = 1700"); +# publication parameter: row_filter_on_update = new +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_4 SET a = 7, b = 37 WHERE a = 1 AND b = 31"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_4 SET a = 102, b = 132 WHERE a = 2 AND b = 32"); $node_publisher->wait_for_catchup($appname); @@ -237,6 +258,46 @@ is($result, qq(1001|test 1001 1700|test 1700 1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); +# Check expected replicated rows for tab_rowfilter_4 +# tap_pub_4 filter is: (a < 10 AND b < 40) +# +# - UPDATE (7, 37) YES, uses new tuple for row filter +# - UPDATE (102, 132) NO, uses new tuple for row filter +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_4 ORDER BY 1, 2"); +is($result, qq(2|32 +3|33 +4|34 +5|35 +6|36 +7|37), 'check replicated rows to table tab_rowfilter_4'); + +# publication parameter: row_filter_on_update = old +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_4 SET (row_filter_on_update = old)"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_4 SET a = 8, b = 38 WHERE a = 3 AND b = 33"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_4 SET a = 104, b = 134 WHERE a = 4 AND b = 34"); + +$node_publisher->wait_for_catchup($appname); + +# Check expected replicated rows for tab_rowfilter_4 +# tap_pub_4 filter is: (a < 10 AND b < 40) +# +# - UPDATE (8, 38) YES, uses old tuple for row filter +# - UPDATE (104, 134) YES, uses old tuple for row filter +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_4 ORDER BY 1, 2"); +is($result, qq(2|32 +5|35 +6|36 +7|37 +8|38 +104|134), 'check replicated rows to table tab_rowfilter_4'); + # Publish using root partitioned table # Use a different partitioned table layout (exercise publish_via_partition_root) $node_publisher->safe_psql('postgres', -- 2.20.1