On 1/10/23 11:36, Jacob Champion wrote:
> 1) I'm playing around with a marker in pg_inherits, where the inhseqno
> is set to a sentinel value (0) for an inheritance relationship that
> has been marked for logical publication. The intent is that the
> pg_inherits helpers will prevent further inheritance relationships
> when they see that marker, and reusing inhseqno means we can make use
> of the existing index to do the lookups. An example:
>
> =# CREATE TABLE root (a int);
> =# CREATE TABLE root_p1 () INHERITS (root);
> =# SELECT pg_set_logical_root('root_p1', 'root');
>
> and then any data written to root_p1 gets replicated via root instead,
> if publish_via_partition_root = true. If root_p1 is set up with extra
> columns, they'll be omitted from replication.
First draft attached. (Due to some indentation changes, it's easiest to
read with --ignore-all-space.)
The overall strategy is
- introduce pg_set_logical_root, which sets the sentinel in pg_inherits,
- swap out any checks for partition parents with checks for logical
parents in the publishing code, and
- introduce the ability for a subscriber to perform an initial table
sync from multiple tables on the publisher.
> 2) While this strategy works well for ongoing replication, it's not
> enough to get the initial synchronization correct. The subscriber
> still does a COPY of the root table directly, missing out on all the
> logical descendant data. The publisher will have to tell the
> subscriber about the relationship somehow, and older subscriber
> versions won't understand how to use that (similar to how old
> subscribers can't correctly handle row filters).
I partially solved this by having the subscriber pull the logical
hierarchy from the publisher to figure out which tables to COPY. This
works when publish_via_partition_root=true, but it doesn't correctly
return to the previous behavior when the setting is false. I need to
check the publication setting from the subscriber, too, but that opens
up the question of what to do if two different publications conflict.
And while I go down that rabbit hole, I wanted to see if anyone thinks
this whole thing is unacceptable. :D
Thanks,
--Jacob
From 379bf99ea022203d428a4027da753a00a3989c04 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jchamp...@timescale.com>
Date: Mon, 26 Sep 2022 13:23:51 -0700
Subject: [PATCH] WIP: introduce pg_set_logical_root for use with pubviaroot
Allows regular inherited tables to be published via their root table,
just like partitions. This works by hijacking pg_inherit's inhseqno
column, and replacing a (single) existing entry for the child with the
value zero, indicating that it should be treated as a logical partition
by the publication machinery.
(For this to work correctly at the moment, the publication must set
publish_via_partition_root=true. See bugs below.)
Initial sync works by pulling in all logical descendants on the
subscriber, then COPYing them one-by-one into the root. The publisher
reuses the existing pubviaroot logic, adding the new logical roots to
code that previously looked only for partition roots.
Known bugs/TODOs:
- When pubviaroot is false, initial sync doesn't work correctly (it
assumes pubviaroot is on and COPYs all descendants into the root).
- The pg_inherits machinery doesn't prohibit changes to inheritance
after an entry has been marked as a logical root.
- I haven't given any thought to interactions with row filters, or to
column lists, or to multiple publications with conflicting pubviaroot
settings.
- pg_set_logical_root() doesn't check for table ownership yet. Anyone
can muck with pg_inherits through it.
- I'm not sure that I'm taking all the necessary locks yet, and those I
do take may be taken in the wrong order.
---
src/backend/catalog/pg_inherits.c | 154 ++++++++++++++
src/backend/catalog/pg_publication.c | 30 +--
src/backend/commands/publicationcmds.c | 10 +
src/backend/replication/logical/tablesync.c | 221 +++++++++++++-------
src/backend/replication/pgoutput/pgoutput.c | 54 ++---
src/include/catalog/pg_inherits.h | 2 +
src/include/catalog/pg_proc.dat | 5 +
src/test/regress/expected/publication.out | 32 +++
src/test/regress/sql/publication.sql | 25 +++
src/test/subscription/t/013_partition.pl | 186 ++++++++++++++++
10 files changed, 608 insertions(+), 111 deletions(-)
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index da969bd2f9..c290a9936a 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -24,10 +24,13 @@
#include "access/table.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
+#include "catalog/partition.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/fmgrprotos.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@@ -655,3 +658,154 @@ PartitionHasPendingDetach(Oid partoid)
elog(ERROR, "relation %u is not a partition", partoid);
return false; /* keep compiler quiet */
}
+
+static Oid
+get_logical_parent_worker(Relation inhRel, Oid relid)
+{
+ SysScanDesc scan;
+ ScanKeyData key[2];
+ Oid result = InvalidOid;
+ HeapTuple tuple;
+
+ ScanKeyInit(&key[0],
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ ScanKeyInit(&key[1],
+ Anum_pg_inherits_inhseqno,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(0));
+
+ scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+ NULL, 2, key);
+ tuple = systable_getnext(scan);
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_inherits form = (Form_pg_inherits) GETSTRUCT(tuple);
+ result = form->inhparent;
+ }
+
+ systable_endscan(scan);
+
+ return result;
+}
+
+static void
+get_logical_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
+{
+ Oid parentOid;
+
+ /*
+ * Recursion ends at the topmost level, ie., when there's no parent.
+ */
+ parentOid = get_logical_parent_worker(inhRel, relid);
+ if (parentOid == InvalidOid)
+ return;
+
+ *ancestors = lappend_oid(*ancestors, parentOid);
+ get_logical_ancestors_worker(inhRel, parentOid, ancestors);
+}
+
+List *
+get_logical_ancestors(Oid relid, bool is_partition)
+{
+ List *result = NIL;
+ Relation inhRel;
+
+ /* For partitions, this is identical to get_partition_ancestors(). */
+ if (is_partition)
+ return get_partition_ancestors(relid);
+
+ inhRel = table_open(InheritsRelationId, AccessShareLock);
+ get_logical_ancestors_worker(inhRel, relid, &result);
+ table_close(inhRel, AccessShareLock);
+
+ return result;
+}
+
+bool
+has_logical_parent(Relation inhRel, Oid relid)
+{
+ return (get_logical_parent_worker(inhRel, relid) != InvalidOid);
+}
+
+Datum
+pg_set_logical_root(PG_FUNCTION_ARGS)
+{
+ Oid tableoid = PG_GETARG_OID(0);
+ Oid rootoid = PG_GETARG_OID(1);
+ char *tablename;
+ char *rootname;
+ Relation inhRel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ Oid parent = InvalidOid;
+ HeapTuple tuple, copyTuple;
+ Form_pg_inherits form;
+
+ /*
+ * Check that the tables exist.
+ * TODO: check inheritance
+ * TODO: and identical schemas too? or does replication handle that?
+ * TODO: check ownership
+ */
+ tablename = get_rel_name(tableoid);
+ if (tablename == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("OID %u does not refer to a table", tableoid)));
+ rootname = get_rel_name(rootoid);
+ if (rootname == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("OID %u does not refer to a table", rootoid)));
+
+ /* Open pg_inherits with RowExclusiveLock so that we can update it. */
+ inhRel = table_open(InheritsRelationId, RowExclusiveLock);
+
+ /*
+ * We have to make sure that the inheritance relationship already exists,
+ * and that there is only one existing parent for this table.
+ *
+ * TODO: do we have to lock the tables themselves to avoid races?
+ */
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(tableoid));
+
+ scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+ NULL, 1, &key);
+ tuple = systable_getnext(scan);
+ if (HeapTupleIsValid(tuple))
+ {
+ form = (Form_pg_inherits) GETSTRUCT(tuple);
+ parent = form->inhparent;
+ copyTuple = heap_copytuple(tuple);
+
+ if (HeapTupleIsValid(systable_getnext(scan)))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("table \"%s\" inherits from multiple tables",
+ tablename)));
+ }
+
+ if (parent != rootoid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("table \"%s\" does not inherit from intended root table \"%s\"",
+ tablename, rootname)));
+
+ systable_endscan(scan);
+
+ /* Mark the inheritance as a logical root by setting it to zero. */
+ form = (Form_pg_inherits) GETSTRUCT(copyTuple);
+ form->inhseqno = 0;
+
+ CatalogTupleUpdate(inhRel, ©Tuple->t_self, copyTuple);
+
+ heap_freetuple(copyTuple);
+ table_close(inhRel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index a98fcad421..e1a02e18d9 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -172,11 +172,11 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
}
/*
- * Filter out the partitions whose parent tables were also specified in
+ * Filter out the tables whose logical parent tables were also specified in
* the publication.
*/
static List *
-filter_partitions(List *relids)
+filter_logical_descendants(List *relids)
{
List *result = NIL;
ListCell *lc;
@@ -188,8 +188,7 @@ filter_partitions(List *relids)
List *ancestors = NIL;
Oid relid = lfirst_oid(lc);
- if (get_rel_relispartition(relid))
- ancestors = get_partition_ancestors(relid);
+ ancestors = get_logical_ancestors(relid, get_rel_relispartition(relid));
foreach(lc2, ancestors)
{
@@ -782,11 +781,15 @@ List *
GetAllTablesPublicationRelations(bool pubviaroot)
{
Relation classRel;
+ Relation inhRel;
ScanKeyData key[1];
TableScanDesc scan;
HeapTuple tuple;
List *result = NIL;
+ /* TODO: is there a required order to acquire these locks? */
+ if (pubviaroot)
+ inhRel = table_open(InheritsRelationId, AccessShareLock);
classRel = table_open(RelationRelationId, AccessShareLock);
ScanKeyInit(&key[0],
@@ -802,7 +805,8 @@ GetAllTablesPublicationRelations(bool pubviaroot)
Oid relid = relForm->oid;
if (is_publishable_class(relid, relForm) &&
- !(relForm->relispartition && pubviaroot))
+ !(relForm->relispartition && pubviaroot) &&
+ !(pubviaroot && has_logical_parent(inhRel, relid)))
result = lappend_oid(result, relid);
}
@@ -831,6 +835,9 @@ GetAllTablesPublicationRelations(bool pubviaroot)
}
table_close(classRel, AccessShareLock);
+ if (pubviaroot)
+ table_close(inhRel, AccessShareLock);
+
return result;
}
@@ -1076,15 +1083,14 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
tables = list_concat_unique_oid(relids, schemarelids);
/*
- * If the publication publishes partition changes via their
- * respective root partitioned tables, we must exclude partitions
- * in favor of including the root partitioned tables. Otherwise,
- * the function could return both the child and parent tables
- * which could cause data of the child table to be
- * double-published on the subscriber side.
+ * If the publication publishes table changes via their respective
+ * logical root tables, we must exclude logical descendants in favor
+ * of including the root tables. Otherwise, the function could
+ * return both the child and parent tables which could cause data of
+ * the child table to be double-published on the subscriber side.
*/
if (publication->pubviaroot)
- tables = filter_partitions(tables);
+ tables = filter_logical_descendants(tables);
}
/* Construct a tuple descriptor for the result rows. */
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f4ba572697..7e23bed6c0 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -238,6 +238,8 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
* parent table, but the bitmap contains the replica identity
* information of the child table. So, get the column number of the
* child table as parent and child column order could be different.
+ *
+ * TODO: is this applicable to pg_set_logical_root()?
*/
if (context->pubviaroot)
{
@@ -286,6 +288,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
*
* Note that even though the row filter used is for an ancestor, the
* REPLICA IDENTITY used will be for the actual child table.
+ *
+ * TODO: is this applicable to pg_set_logical_root()?
*/
if (pubviaroot && relation->rd_rel->relispartition)
{
@@ -336,6 +340,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
* the column list.
*
* Returns true if any replica identity column is not covered by column list.
+ *
+ * TODO: pg_set_logical_root()?
*/
bool
pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
@@ -628,6 +634,8 @@ TransformPubWhereClauses(List *tables, const char *queryString,
* If the publication doesn't publish changes via the root partitioned
* table, the partition's row filter will be used. So disallow using
* WHERE clause on partitioned table in this case.
+ *
+ * TODO: decide how this interacts with pg_set_logical_root
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -715,6 +723,8 @@ CheckPubRelationColumnList(char *pubname, List *tables,
* If the publication doesn't publish changes via the root partitioned
* table, the partition's column list will be used. So disallow using
* a column list on the partitioned table in this case.
+ *
+ * TODO: decide if this interacts with pg_set_logical_root()
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce7129..b83b9e91ef 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -748,11 +748,12 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
* message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * qualifications to be used in the COPY command, and the list of tables to COPY
+ * (which for most tables will contain only one entry).
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+ LogicalRepRelation *lrel, List **qual, List **to_copy)
{
WalRcvExecResult *res;
StringInfoData cmd;
@@ -1063,6 +1064,73 @@ fetch_remote_table_info(char *nspname, char *relname,
walrcv_clear_result(res);
}
+ /*
+ * See if there are any other tables to be copied besides the original. This
+ * happens when a descendant in the inheritance relationship is marked with
+ * pg_set_logical_root().
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+ {
+ Oid descRow[] = {TEXTOID, TEXTOID};
+
+ /*
+ * Find all logical descendants rooted at this table (including the
+ * table itself).
+ */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "WITH RECURSIVE descendants(relid) AS ("
+ " VALUES (%u::oid) UNION ALL"
+ " SELECT inhrelid"
+ " FROM pg_catalog.pg_inherits i, descendants d"
+ " WHERE i.inhseqno = 0"
+ " AND d.relid = i.inhparent"
+ ")"
+ "SELECT n.nspname, c.relname"
+ " FROM descendants,"
+ " pg_catalog.pg_class c"
+ " JOIN pg_catalog.pg_namespace n"
+ " ON (c.relnamespace = n.oid)"
+ " WHERE c.oid = descendants.relid",
+ lrel->remoteid);
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(descRow), descRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch logical descendants for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *desc_nspname;
+ char *desc_relname;
+ char *quoted;
+
+ desc_nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ desc_relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ quoted = quote_qualified_identifier(desc_nspname, desc_relname);
+ *to_copy = lappend(*to_copy, quoted);
+
+ ExecClearTuple(slot);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ walrcv_clear_result(res);
+ }
+ else
+ {
+ /* For older servers, we only COPY the table itself. */
+ char *quoted = quote_qualified_identifier(lrel->nspname,
+ lrel->relname);
+ *to_copy = lappend(*to_copy, quoted);
+ }
+
pfree(cmd.data);
}
@@ -1077,6 +1145,8 @@ copy_table(Relation rel)
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
List *qual = NIL;
+ List *to_copy = NIL;
+ ListCell *cur;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
@@ -1085,7 +1155,8 @@ copy_table(Relation rel)
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &to_copy);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -1094,92 +1165,96 @@ copy_table(Relation rel)
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
- /* Start copy on the publisher. */
- initStringInfo(&cmd);
-
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ foreach(cur, to_copy)
{
- appendStringInfo(&cmd, "COPY %s (",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
+ char *quoted_name = lfirst(cur);
- /*
- * XXX Do we need to list the columns in all cases? Maybe we're
- * replicating all columns?
- */
- for (int i = 0; i < lrel.natts; i++)
- {
- if (i > 0)
- appendStringInfoString(&cmd, ", ");
-
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
- }
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
- appendStringInfoString(&cmd, ") TO STDOUT");
- }
- else
- {
- /*
- * For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
- */
- appendStringInfoString(&cmd, "COPY (SELECT ");
- for (int i = 0; i < lrel.natts; i++)
+ /* Regular table with no row filter */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL)
{
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
- if (i < lrel.natts - 1)
- appendStringInfoString(&cmd, ", ");
- }
+ appendStringInfo(&cmd, "COPY %s (", quoted_name);
- appendStringInfoString(&cmd, " FROM ");
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're
+ * replicating all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
- /*
- * For regular tables, make sure we don't copy data from a child that
- * inherits the named table as those will be copied separately.
- */
- if (lrel.relkind == RELKIND_RELATION)
- appendStringInfoString(&cmd, "ONLY ");
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
- appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
- /* list of OR'ed filters */
- if (qual != NIL)
+ appendStringInfoString(&cmd, ") TO STDOUT");
+ }
+ else
{
- ListCell *lc;
- char *q = strVal(linitial(qual));
+ /*
+ * For non-tables and tables with row filters, we need to do COPY
+ * (SELECT ...), but we can't just do SELECT * because we need to not
+ * copy generated columns. For tables with any row filters, build a
+ * SELECT query with OR'ed row filters for COPY.
+ */
+ appendStringInfoString(&cmd, "COPY (SELECT ");
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ if (i < lrel.natts - 1)
+ appendStringInfoString(&cmd, ", ");
+ }
- appendStringInfo(&cmd, " WHERE %s", q);
- for_each_from(lc, qual, 1)
+ appendStringInfoString(&cmd, " FROM ");
+
+ /*
+ * For regular tables, make sure we don't copy data from a child that
+ * inherits the named table as those will be copied separately.
+ */
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfoString(&cmd, "ONLY ");
+
+ appendStringInfoString(&cmd, quoted_name);
+ /* list of OR'ed filters */
+ if (qual != NIL)
{
- q = strVal(lfirst(lc));
- appendStringInfo(&cmd, " OR %s", q);
+ ListCell *lc;
+ char *q = strVal(linitial(qual));
+
+ appendStringInfo(&cmd, " WHERE %s", q);
+ for_each_from(lc, qual, 1)
+ {
+ q = strVal(lfirst(lc));
+ appendStringInfo(&cmd, " OR %s", q);
+ }
+ list_free_deep(qual);
}
- list_free_deep(qual);
- }
- appendStringInfoString(&cmd, ") TO STDOUT");
- }
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
- pfree(cmd.data);
- if (res->status != WALRCV_OK_COPY_OUT)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not start initial contents copy for table \"%s.%s\": %s",
- lrel.nspname, lrel.relname, res->err)));
- walrcv_clear_result(res);
+ appendStringInfoString(&cmd, ") TO STDOUT");
+ }
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
+ pfree(cmd.data);
+ if (res->status != WALRCV_OK_COPY_OUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not start initial contents copy for table \"%s.%s\" from remote %s: %s",
+ lrel.nspname, lrel.relname, quoted_name, res->err)));
+ walrcv_clear_result(res);
- copybuf = makeStringInfo();
+ copybuf = makeStringInfo();
- pstate = make_parsestate(NULL);
- (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
- NULL, false, false);
+ pstate = make_parsestate(NULL);
+ (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
+ NULL, false, false);
- attnamelist = make_copy_attnamelist(relmapentry);
- cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+ attnamelist = make_copy_attnamelist(relmapentry);
+ cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
- /* Do the copy */
- (void) CopyFrom(cstate);
+ /* Do the copy */
+ (void) CopyFrom(cstate);
+ }
logicalrep_rel_close(relmapentry, NoLock);
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1a80d67bb9..897c5e0d92 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -14,6 +14,7 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
@@ -1450,7 +1451,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
- Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
/* Convert tuple if needed. */
@@ -2141,55 +2141,57 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
int ancestor_level = 0;
/*
- * If this is a FOR ALL TABLES publication, pick the partition
+ * If this is a FOR ALL TABLES publication, pick the logical
* root and set the ancestor level accordingly.
*/
if (pub->alltables)
{
publish = true;
- if (pub->pubviaroot && am_partition)
+ if (pub->pubviaroot)
{
- List *ancestors = get_partition_ancestors(relid);
+ List *ancestors;
- pub_relid = llast_oid(ancestors);
- ancestor_level = list_length(ancestors);
+ ancestors = get_logical_ancestors(relid, am_partition);
+ if (ancestors != NIL)
+ {
+ pub_relid = llast_oid(ancestors);
+ ancestor_level = list_length(ancestors);
+ }
}
}
if (!publish)
{
bool ancestor_published = false;
+ Oid ancestor;
+ int level;
+ List *ancestors;
/*
- * For a partition, check if any of the ancestors are
- * published. If so, note down the topmost ancestor that is
+ * Check if any of the logical ancestors (that is, partition
+ * parents or tables marked with pg_set_logical_root()) are
+ * published. If so, note down the topmost ancestor that is
* published via this publication, which will be used as the
- * relation via which to publish the partition's changes.
+ * relation via which to publish this table's changes.
*/
- if (am_partition)
- {
- Oid ancestor;
- int level;
- List *ancestors = get_partition_ancestors(relid);
-
- ancestor = GetTopMostAncestorInPublication(pub->oid,
- ancestors,
- &level);
+ ancestors = get_logical_ancestors(relid, am_partition);
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors,
+ &level);
- if (ancestor != InvalidOid)
+ if (ancestor != InvalidOid)
+ {
+ ancestor_published = true;
+ if (pub->pubviaroot)
{
- ancestor_published = true;
- if (pub->pubviaroot)
- {
- pub_relid = ancestor;
- ancestor_level = level;
- }
+ pub_relid = ancestor;
+ ancestor_level = level;
}
}
if (list_member_oid(pubids, pub->oid) ||
list_member_oid(schemaPubids, pub->oid) ||
- ancestor_published)
+ (am_partition && ancestor_published))
publish = true;
}
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index ce154ab943..59d72a97b8 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -63,5 +63,7 @@ extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent,
bool expect_detach_pending,
const char *childname);
extern bool PartitionHasPendingDetach(Oid partoid);
+extern List *get_logical_ancestors(Oid relid, bool is_partition);
+extern bool has_logical_parent(Relation inhRel, Oid relid);
#endif /* PG_INHERITS_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 86eb8e8c58..6f5f85c5cc 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11891,4 +11891,9 @@
prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
prosrc => 'brin_minmax_multi_summary_send' },
+{ oid => '8136', descr => 'mark a table root for logical replication',
+ proname => 'pg_set_logical_root', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'regclass regclass',
+ prosrc => 'pg_set_logical_root' },
+
]
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 427f87ea07..0e06103f4c 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -1718,9 +1718,41 @@ SELECT * FROM pg_publication_tables;
pub | sch1 | tbl1 | {a} |
(1 row)
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ERROR: table "ipart1" does not inherit from intended root table "iroot"
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+ERROR: table "ipart2" does not inherit from intended root table "tbl1"
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ERROR: table "ipart2" inherits from multiple tables
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
RESET client_min_messages;
DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index a47c5939d5..52a0d6ba48 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -1087,9 +1087,34 @@ ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (
CREATE PUBLICATION pub FOR TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
SELECT * FROM pg_publication_tables;
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
+
RESET client_min_messages;
DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 11a5c3c03e..b6e3143536 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -877,4 +877,190 @@ $result = $node_subscriber2->safe_psql('postgres',
"SELECT a, b, c FROM tab5_1 ORDER BY 1");
is($result, qq(4||1), 'updates of tab5 replicated correctly');
+# Test that replication works for older inheritance/trigger setups as well.
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_publisher->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab1_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO itab1_1 VALUES (NEW.*);
+ ELSIF ( NEW.a = 2 ) THEN INSERT INTO itab1_2 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+ CREATE TRIGGER itab1_trigger
+ BEFORE INSERT ON itab1
+ FOR EACH ROW EXECUTE FUNCTION itab1_trigger();");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_1', 'itab1')");
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_2', 'itab1')");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab2 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab2_1 (CHECK (a = 1)) INHERITS (itab2)");
+
+$node_publisher->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab2_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO itab2_1 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+ CREATE TRIGGER itab2_trigger
+ BEFORE INSERT ON itab2
+ FOR EACH ROW EXECUTE FUNCTION itab2_trigger();");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab2_1', 'itab2')");
+
+# itab2_1 should be published using its own identity here, since its parent is
+# not included. itab1_1 should be published via its parent, itab1, without
+# duplicating the rows.
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub_viaroot ADD TABLE itab1, itab1_1, itab2_1");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (0, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (2, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (0, 'itab2')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2')");
+
+# Subscriber 1 only subscribes to some of the partitions, and does not set up
+# partition triggers, to check for the correct routing.
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab2_1 (a int, b text)");
+
+# Subscriber 2 has different partition names for itab1, and it doesn't partition
+# itab2 at all.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_part1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_part2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_subscriber2->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO public.itab1_part1 VALUES (NEW.*);
+ ELSIF ( NEW.a = 2 ) THEN INSERT INTO public.itab1_part2 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_subscriber2->safe_psql('postgres', "
+ CREATE TRIGGER itab_trigger
+ BEFORE INSERT ON itab1
+ FOR EACH ROW EXECUTE FUNCTION itab_trigger();");
+$node_subscriber2->safe_psql('postgres', "
+ ALTER TABLE itab1 ENABLE ALWAYS TRIGGER itab_trigger;");
+
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab2 (a int, b text)");
+
+$node_subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub_viaroot REFRESH PUBLICATION");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
+
+# check that data is synced correctly
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 1');
+
+# all of the data should have been routed to itab1 directly (there are no
+# triggers on subscriber 1 to move it elsewhere)
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data correctly routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2), 'initial data synced for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1), 'initial data correctly routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2), 'initial data synced for itab2 on subscriber 2');
+
+# make sure new data is also correctly routed to the roots
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1-new')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2-new')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2
+1|itab2-new), 'new data routed for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_part1 ORDER BY 1, 2");
+is($result, qq(1|itab1
+1|itab1-new), 'new data moved to itab1_part1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2
+1|itab2-new), 'new data routed for itab2 on subscriber 2');
+
done_testing();
--
2.25.1