On 2021-Dec-31, Justin Pryzby wrote:
> > @@ -5963,8 +5967,20 @@ describePublications(const char *pattern)
> > {
> > + "
> > CASE WHEN pr.prattrs IS NOT NULL THEN\n"
> > + "
> > pg_catalog.array_to_string"
> > +
> > "(ARRAY(SELECT attname\n"
> > + "
> > FROM pg_catalog.generate_series(0,
> > pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
> > + "
> > pg_catalog.pg_attribute\n"
> > + "
> > WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
> > + "
> > ELSE NULL END AS columns");
> I suppose this should use pr.prattrs::pg_catalog.int2[] ?
True. Changed that.
Another change in this v15 is that I renamed the test file from ".patch"
to ".pl". I suppose I mistyped the extension when renumbering from 021
to 028.
> Did the DatumGetBool issue expose a deficiency in testing ?
> I think the !am_partition path was never being hit.
Hmm, the TAP test creates a subscription that contains both types of
tables. I tried adding an assert for each case, and they were both hit
on running the test.
--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/
"La persona que no quería pecar / estaba obligada a sentarse
en duras y empinadas sillas / desprovistas, por cierto
de blandos atenuantes" (Patricio Vogel)
>From ad2766290e7011481813ce24c1947bff70415211 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <[email protected]>
Date: Mon, 6 Sep 2021 10:34:29 -0300
Subject: [PATCH v15] Support column lists for logical replication of tables
Add the capability of specifying a column list for individual tables as
part of a publication. Columns not in the list are not published. This
enables replicating to a table with only a subset of the columns.
If no column list is specified, all the columns are replicated, as
previously
Author: Rahila Syed <[email protected]>
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektnrh8nj1jf...@mail.gmail.com
---
doc/src/sgml/protocol.sgml | 4 +-
doc/src/sgml/ref/alter_publication.sgml | 20 +-
doc/src/sgml/ref/create_publication.sgml | 11 +-
src/backend/catalog/pg_publication.c | 306 +++++++++++++++++++-
src/backend/commands/publicationcmds.c | 67 ++++-
src/backend/commands/tablecmds.c | 79 ++++-
src/backend/nodes/copyfuncs.c | 1 +
src/backend/nodes/equalfuncs.c | 1 +
src/backend/parser/gram.y | 60 +++-
src/backend/replication/logical/proto.c | 66 +++--
src/backend/replication/logical/tablesync.c | 120 +++++++-
src/backend/replication/pgoutput/pgoutput.c | 78 ++++-
src/bin/pg_dump/pg_dump.c | 41 ++-
src/bin/pg_dump/pg_dump.h | 1 +
src/bin/psql/describe.c | 26 +-
src/bin/psql/tab-complete.c | 2 +
src/include/catalog/pg_publication.h | 5 +
src/include/catalog/pg_publication_rel.h | 3 +
src/include/nodes/parsenodes.h | 4 +-
src/include/replication/logicalproto.h | 6 +-
src/test/regress/expected/publication.out | 33 ++-
src/test/regress/sql/publication.sql | 20 +-
src/test/subscription/t/028_column_list.pl | 164 +++++++++++
23 files changed, 1034 insertions(+), 84 deletions(-)
create mode 100644 src/test/subscription/t/028_column_list.pl
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 34a7034282..5bc2e7a591 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6877,7 +6877,9 @@ Relation
</listitem>
</varlistentry>
</variablelist>
- Next, the following message part appears for each column (except generated columns):
+ Next, the following message part appears for each column (except
+ generated columns and other columns that don't appear in the column
+ filter list, for tables that have one):
<variablelist>
<varlistentry>
<term>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..16a12b44b9 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -25,12 +25,13 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD <replace
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET <replaceable class="parameter">publication_object</replaceable> [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP <replaceable class="parameter">publication_object</replaceable> [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ALTER TABLE <replaceable class="parameter">publication_object</replaceable> SET COLUMNS { ( <replaceable class="parameter">name</replaceable> [, ...] ) | ALL }
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@@ -62,6 +63,11 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
command retain their previous settings.
</para>
+ <para>
+ The <literal>ALTER TABLE ... SET COLUMNS</literal> variant allows to change
+ the set of columns that are included in the publication.
+ </para>
+
<para>
The remaining variants change the owner and the name of the publication.
</para>
@@ -110,6 +116,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
+ Optionally, a column list can be specified. See <xref
+ linkend="sql-createpublication"/> for details.
</para>
</listitem>
</varlistentry>
@@ -164,9 +172,15 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
</para>
<para>
- Add some tables to the publication:
+ Add tables to the publication:
<programlisting>
-ALTER PUBLICATION mypublication ADD TABLE users, departments;
+ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
+</programlisting></para>
+
+ <para>
+ Change the set of columns published for a table:
+<programlisting>
+ALTER PUBLICATION mypublication ALTER TABLE users SET COLUMNS (user_id, firstname, lastname);
</programlisting></para>
<para>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
publication, so they are never explicitly added to the publication.
</para>
+ <para>
+ When a column list is specified, only the listed columns are replicated;
+ any other columns are ignored for the purpose of replication through
+ this publication. If no column list is specified, all columns of the
+ table are replicated through this publication, including any columns
+ added later. If a column list is specified, it must include the replica
+ identity columns.
+ </para>
+
<para>
Only persistent base tables and partitioned tables can be part of a
publication. Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index b307bc2ed5..af5d1a281f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -45,13 +45,23 @@
#include "utils/rel.h"
#include "utils/syscache.h"
+
+static void check_publication_columns(Relation targetrel, Bitmapset *columns);
+static AttrNumber *publication_translate_columns(Relation targetrel, List *columns,
+ int *natts, Bitmapset **attset);
+
/*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter, or NULL if
+ * no column filter was specified.
*/
static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
{
+ bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
/* Must be a regular or partitioned table */
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +92,63 @@ check_publication_add_relation(Relation targetrel)
errmsg("cannot add relation \"%s\" to publication",
RelationGetRelationName(targetrel)),
errdetail("This operation is not supported for unlogged tables.")));
+
+ /* Make sure the column list checks out */
+ if (columns != NULL)
+ {
+ /*
+ * Even if the user listed all columns in the column list, we cannot
+ * allow a column list to be specified when REPLICA IDENTITY is FULL;
+ * that would cause problems if a new column is added later, because
+ * the new column would have to be included (because of being part of
+ * the replica identity) but it's technically not allowed (because of
+ * not being in the publication's column list yet). So reject this
+ * case altogether.
+ */
+ if (replidentfull)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("invalid column list for publishing relation \"%s\"",
+ RelationGetRelationName(targetrel)),
+ errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+ check_publication_columns(targetrel, columns);
+ }
+}
+
+/*
+ * Enforce that the column filter can only leave out columns that aren't
+ * forced to be sent.
+ *
+ * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+ * columns need to be sent regardless); and in other cases, the columns in
+ * the REPLICA IDENTITY cannot be left out.
+ */
+static void
+check_publication_columns(Relation targetrel, Bitmapset *columns)
+{
+ Bitmapset *idattrs;
+ int x;
+
+ idattrs = RelationGetIndexAttrBitmap(targetrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+ /*
+ * We have to test membership the hard way, because the values returned by
+ * RelationGetIndexAttrBitmap are offset.
+ */
+ x = -1;
+ while ((x = bms_next_member(idattrs, x)) >= 0)
+ {
+ if (!bms_is_member(x + FirstLowInvalidHeapAttributeNumber, columns))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("invalid column list for publishing relation \"%s\"",
+ RelationGetRelationName(targetrel)),
+ errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+ }
+
+ bms_free(idattrs);
}
/*
@@ -289,6 +356,9 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
Oid relid = RelationGetRelid(targetrel->relation);
Oid pubreloid;
Publication *pub = GetPublication(pubid);
+ Bitmapset *attset = NULL;
+ AttrNumber *attarray;
+ int natts = 0;
ObjectAddress myself,
referenced;
List *relids = NIL;
@@ -314,7 +384,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
RelationGetRelationName(targetrel->relation), pub->name)));
}
- check_publication_add_relation(targetrel->relation);
+ /* Translate column names to numbers and verify suitability */
+ attarray = publication_translate_columns(targetrel->relation,
+ targetrel->columns,
+ &natts, &attset);
+
+ check_publication_add_relation(targetrel->relation, attset);
+
+ bms_free(attset);
/* Form a tuple. */
memset(values, 0, sizeof(values));
@@ -327,6 +404,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
ObjectIdGetDatum(pubid);
values[Anum_pg_publication_rel_prrelid - 1] =
ObjectIdGetDatum(relid);
+ if (targetrel->columns)
+ {
+ int2vector *prattrs;
+
+ prattrs = buildint2vector(attarray, natts);
+ values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+ }
+ else
+ nulls[Anum_pg_publication_rel_prattrs - 1] = true;
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -337,6 +423,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
/* Register dependencies as needed */
ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
+ /* Add dependency on the columns, if any are listed */
+ for (int i = 0; i < natts; i++)
+ {
+ ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+ }
+ pfree(attarray);
/* Add dependency on the publication */
ObjectAddressSet(referenced, PublicationRelationId, pubid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
@@ -364,6 +457,143 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
return myself;
}
+/*
+ * Update the column list for a relation in a publication.
+ */
+void
+publication_set_table_columns(Relation pubrel, HeapTuple pubreltup,
+ Relation targetrel, List *columns)
+{
+ Bitmapset *attset;
+ AttrNumber *attarray;
+ HeapTuple copytup;
+ int natts;
+ bool nulls[Natts_pg_publication_rel];
+ bool replaces[Natts_pg_publication_rel];
+ Datum values[Natts_pg_publication_rel];
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, 0, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_publication_rel_prattrs - 1] = true;
+
+ deleteDependencyRecordsForClass(PublicationRelationId,
+ ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid,
+ RelationRelationId,
+ DEPENDENCY_AUTO);
+
+ if (columns == NULL)
+ {
+ nulls[Anum_pg_publication_rel_prattrs - 1] = true;
+ }
+ else
+ {
+ ObjectAddress myself,
+ referenced;
+ int2vector *prattrs;
+
+ if (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot change column set for relation \"%s\"",
+ RelationGetRelationName(targetrel)),
+ errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+ attarray = publication_translate_columns(targetrel, columns,
+ &natts, &attset);
+
+ /*
+ * Make sure the column list checks out. XXX this should occur at
+ * caller in publicationcmds.c, not here.
+ */
+ check_publication_columns(targetrel, attset);
+ bms_free(attset);
+
+ prattrs = buildint2vector(attarray, natts);
+ values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+
+ /* Add dependencies on the new list of columns */
+ ObjectAddressSet(myself, PublicationRelRelationId,
+ ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid);
+ for (int i = 0; i < natts; i++)
+ {
+ ObjectAddressSubSet(referenced, RelationRelationId,
+ RelationGetRelid(targetrel), attarray[i]);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+ }
+ }
+
+ copytup = heap_modify_tuple(pubreltup, RelationGetDescr(pubrel),
+ values, nulls, replaces);
+
+ CatalogTupleUpdate(pubrel, &pubreltup->t_self, copytup);
+
+ heap_freetuple(copytup);
+}
+
+/*
+ * Translate a list of column names to an array of attribute numbers
+ * and a Bitmapset with them; verify that each attribute is appropriate
+ * to have in a publication column list. Other checks are done later;
+ * see check_publication_columns.
+ *
+ * Note that the attribute numbers are *not* offset by
+ * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
+ * is okay.
+ */
+static AttrNumber *
+publication_translate_columns(Relation targetrel, List *columns, int *natts,
+ Bitmapset **attset)
+{
+ AttrNumber *attarray;
+ Bitmapset *set = NULL;
+ ListCell *lc;
+ int n = 0;
+
+ /*
+ * Translate list of columns to attnums. We prohibit system attributes and
+ * make sure there are no duplicate columns.
+ */
+ attarray = palloc(sizeof(AttrNumber) * list_length(columns));
+ foreach(lc, columns)
+ {
+ char *colname = strVal(lfirst(lc));
+ AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
+
+ if (attnum == InvalidAttrNumber)
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colname, RelationGetRelationName(targetrel)));
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot reference system column \"%s\" in publication column list",
+ colname));
+
+ if (bms_is_member(attnum, set))
+ ereport(ERROR,
+ errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("duplicate column \"%s\" in publication column list",
+ colname));
+
+ set = bms_add_member(set, attnum);
+ attarray[n++] = attnum;
+ }
+
+ /*
+ * XXX qsort the array here, or maybe build just the bitmapset above and
+ * then scan that in order to produce the array? Do we care about the
+ * array being unsorted?
+ */
+
+ *natts = n;
+ *attset = set;
+ return attarray;
+}
+
/*
* Insert new publication / schema mapping.
*/
@@ -471,6 +701,74 @@ GetRelationPublications(Oid relid)
return result;
}
+/*
+ * Gets a list of OIDs of all partial-column publications of the given
+ * relation, that is, those that specify a column list.
+ */
+List *
+GetRelationColumnPartialPublications(Oid relid)
+{
+ CatCList *pubrellist;
+ List *pubs = NIL;
+
+ pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid));
+ for (int i = 0; i < pubrellist->n_members; i++)
+ {
+ HeapTuple tup = &pubrellist->members[i]->tuple;
+ bool isnull;
+
+ (void) SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+ Anum_pg_publication_rel_prattrs,
+ &isnull);
+ if (isnull)
+ continue;
+
+ pubs = lappend_oid(pubs,
+ ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid);
+ }
+
+ ReleaseSysCacheList(pubrellist);
+
+ return pubs;
+}
+
+/*
+ * For a relation in a publication that is known to have a non-null column
+ * list, return the list of attribute numbers that are in it.
+ */
+List *
+GetRelationColumnListInPublication(Oid relid, Oid pubid)
+{
+ HeapTuple tup;
+ Datum adatum;
+ bool isnull;
+ ArrayType *arr;
+ int nelems;
+ int16 *elems;
+ List *attnos = NIL;
+
+ tup = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for rel %u of publication %u", relid, pubid);
+ adatum = SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+ Anum_pg_publication_rel_prattrs, &isnull);
+ if (isnull)
+ elog(ERROR, "found unexpected null in pg_publication_rel.prattrs");
+ arr = DatumGetArrayTypeP(adatum);
+ nelems = ARR_DIMS(arr)[0];
+ elems = (int16 *) ARR_DATA_PTR(arr);
+
+ for (int i = 0; i < nelems; i++)
+ attnos = lappend_oid(attnos, elems[i]);
+
+ ReleaseSysCache(tup);
+
+ return attnos;
+}
+
/*
* Gets list of relation oids for a publication.
*
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 0f04969fd6..657374c0d1 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -376,6 +376,46 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
return myself;
}
+/*
+ * Change the column list of a relation in a publication
+ */
+static void
+PublicationSetColumns(AlterPublicationStmt *stmt,
+ Form_pg_publication pubform, PublicationTable *table)
+{
+ Relation rel,
+ urel;
+ HeapTuple tup;
+ ObjectAddress obj,
+ secondary;
+
+ rel = table_open(PublicationRelRelationId, RowExclusiveLock);
+ urel = table_openrv(table->relation, ShareUpdateExclusiveLock);
+
+ tup = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(RelationGetRelid(urel)),
+ ObjectIdGetDatum(pubform->oid));
+ if (!HeapTupleIsValid(tup))
+ ereport(ERROR,
+ errmsg("relation \"%s\" is not already in publication \"%s\"",
+ table->relation->relname,
+ NameStr(pubform->pubname)));
+
+ publication_set_table_columns(rel, tup, urel, table->columns);
+
+ ObjectAddressSet(obj, PublicationRelationId,
+ ((Form_pg_publication_rel) GETSTRUCT(tup))->oid);
+ ObjectAddressSet(secondary, RelationRelationId, RelationGetRelid(urel));
+ EventTriggerCollectSimpleCommand(obj, secondary, (Node *) stmt);
+
+ ReleaseSysCache(tup);
+
+ table_close(rel, RowExclusiveLock);
+ table_close(urel, NoLock);
+
+ InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
+}
+
/*
* Change options of a publication.
*/
@@ -523,6 +563,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
}
else if (stmt->action == AP_DropObjects)
PublicationDropTables(pubid, rels, false);
+ else if (stmt->action == AP_SetColumns)
+ {
+ Assert(schemaidlist == NIL);
+ Assert(list_length(tables) == 1);
+
+ PublicationSetColumns(stmt, pubform,
+ linitial_node(PublicationTable, tables));
+ }
else /* AP_SetObjects */
{
List *oldrelids = GetPublicationRelations(pubid,
@@ -562,7 +610,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
pubrel = palloc(sizeof(PublicationRelInfo));
pubrel->relation = oldrel;
-
+ /* This is not needed to delete a table */
+ pubrel->columns = NIL;
delrels = lappend(delrels, pubrel);
}
}
@@ -622,7 +671,7 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
}
else if (stmt->action == AP_DropObjects)
PublicationDropSchemas(pubform->oid, schemaidlist, false);
- else /* AP_SetObjects */
+ else if (stmt->action == AP_SetObjects)
{
List *oldschemaids = GetPublicationSchemas(pubform->oid);
List *delschemas = NIL;
@@ -645,6 +694,10 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
*/
PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
}
+ else
+ {
+ /* Nothing to do for AP_SetColumns */
+ }
}
/*
@@ -934,6 +987,8 @@ OpenTableList(List *tables)
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
+ pub_rel->columns = t->columns;
+
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, myrelid);
@@ -967,8 +1022,11 @@ OpenTableList(List *tables)
/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
+
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
+ pub_rel->columns = t->columns;
+
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, childrelid);
}
@@ -1076,6 +1134,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
Relation rel = pubrel->relation;
Oid relid = RelationGetRelid(rel);
+ if (pubrel->columns)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
+
prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3631b8a929..232a068613 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8347,6 +8347,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
bool missing_ok, LOCKMODE lockmode,
ObjectAddresses *addrs)
{
+ Oid relid = RelationGetRelid(rel);
HeapTuple tuple;
Form_pg_attribute targetatt;
AttrNumber attnum;
@@ -8366,7 +8367,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
/*
* get the number of the attribute
*/
- tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+ tuple = SearchSysCacheAttName(relid, colName);
if (!HeapTupleIsValid(tuple))
{
if (!missing_ok)
@@ -8420,13 +8421,42 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
ReleaseSysCache(tuple);
+ /*
+ * Also, if the column is used in the column list of a publication,
+ * disallow the drop if the DROP is RESTRICT. We don't do anything if the
+ * DROP is CASCADE, which means that the dependency mechanism will remove
+ * the relation from the publication.
+ */
+ if (behavior == DROP_RESTRICT)
+ {
+ List *pubs;
+ ListCell *lc;
+
+ pubs = GetRelationColumnPartialPublications(relid);
+ foreach(lc, pubs)
+ {
+ Oid pubid = lfirst_oid(lc);
+ List *published_cols;
+
+ published_cols =
+ GetRelationColumnListInPublication(relid, pubid);
+
+ if (list_member_oid(published_cols, attnum))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot drop column \"%s\" because it is part of publication \"%s\"",
+ colName, get_publication_name(pubid, false)),
+ errhint("Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication."));
+ }
+ }
+
/*
* Propagate to children as appropriate. Unlike most other ALTER
* routines, we have to do this one level of recursion at a time; we can't
* use find_all_inheritors to do it in one pass.
*/
children =
- find_inheritance_children(RelationGetRelid(rel), lockmode);
+ find_inheritance_children(relid, lockmode);
if (children)
{
@@ -8514,7 +8544,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
/* Add object to delete */
object.classId = RelationRelationId;
- object.objectId = RelationGetRelid(rel);
+ object.objectId = relid;
object.objectSubId = attnum;
add_exact_object_address(&object, addrs);
@@ -15581,6 +15611,7 @@ relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid,
CatalogTupleUpdate(pg_index, &pg_index_tuple->t_self, pg_index_tuple);
InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
InvalidOid, is_internal);
+
/*
* Invalidate the relcache for the table, so that after we commit
* all sessions will refresh the table's replica identity index
@@ -15603,6 +15634,11 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
Oid indexOid;
Relation indexRel;
int key;
+ List *pubs;
+ Bitmapset *indexed_cols = NULL;
+ ListCell *lc;
+
+ pubs = GetRelationColumnPartialPublications(RelationGetRelid(rel));
if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
{
@@ -15611,11 +15647,16 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
}
else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
{
+ if (pubs != NIL)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set REPLICA IDENTITY FULL when publications contain relations that specify column lists"));
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
return;
}
else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
{
+ /* XXX not sure what's the right check for publications here */
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
return;
}
@@ -15700,6 +15741,38 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
RelationGetRelationName(indexRel),
NameStr(attr->attname))));
+
+ /*
+ * Collect columns used, in case we have any publications that we need
+ * to vet. System attributes are disallowed so no need to subtract
+ * FirstLowInvalidHeapAttributeNumber.
+ */
+ indexed_cols = bms_add_member(indexed_cols, attno);
+ }
+
+ /*
+ * Check partial-column publications. All publications have to include
+ * all key columns of the new index.
+ */
+ foreach(lc, pubs)
+ {
+ Oid pubid = lfirst_oid(lc);
+ List *published_cols;
+
+ published_cols =
+ GetRelationColumnListInPublication(RelationGetRelid(rel), pubid);
+
+ for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++)
+ {
+ int16 attno = indexRel->rd_index->indkey.values[key];
+
+ if (!list_member_oid(published_cols, attno))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("index \"%s\" cannot be used because publication \"%s\" does not include all indexed columns",
+ RelationGetRelationName(indexRel),
+ get_publication_name(pubid, false)));
+ }
}
/* This index is suitable for use as a replica identity. Mark it. */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
PublicationTable *newnode = makeNode(PublicationTable);
COPY_NODE_FIELD(relation);
+ COPY_NODE_FIELD(columns);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
+ COMPARE_NODE_FIELD(columns);
return true;
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6dddc07947..068f67998c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9740,12 +9740,13 @@ CreatePublicationStmt:
* relation_expr here.
*/
PublicationObjSpec:
- TABLE relation_expr
+ TABLE relation_expr opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_TABLE;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $2;
+ $$->pubtable->columns = $3;
}
| ALL TABLES IN_P SCHEMA ColId
{
@@ -9760,28 +9761,38 @@ PublicationObjSpec:
$$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
$$->location = @5;
}
- | ColId
+ | ColId opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
- $$->name = $1;
+ if ($2 != NULL)
+ {
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+ $$->pubtable->columns = $2;
+ $$->name = NULL;
+ }
+ else
+ $$->name = $1;
$$->location = @1;
}
- | ColId indirection
+ | ColId indirection opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+ $$->pubtable->columns = $3;
$$->location = @1;
}
/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
- | extended_relation_expr
+ | extended_relation_expr opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $1;
+ $$->pubtable->columns = $2;
}
| CURRENT_SCHEMA
{
@@ -9807,6 +9818,9 @@ pub_obj_list: PublicationObjSpec
*
* ALTER PUBLICATION name SET pub_obj [, ...]
*
+ * ALTER PUBLICATION name SET COLUMNS table_name (column[, ...])
+ * ALTER PUBLICATION name SET COLUMNS table_name ALL
+ *
* pub_obj is one of:
*
* TABLE table_name [, ...]
@@ -9840,6 +9854,32 @@ AlterPublicationStmt:
n->action = AP_SetObjects;
$$ = (Node *)n;
}
+ | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS '(' columnList ')'
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ PublicationObjSpec *obj = makeNode(PublicationObjSpec);
+ obj->pubobjtype = PUBLICATIONOBJ_TABLE;
+ obj->pubtable = makeNode(PublicationTable);
+ obj->pubtable->relation = $6;
+ obj->pubtable->columns = $10;
+ n->pubname = $3;
+ n->pubobjects = list_make1(obj);
+ n->action = AP_SetColumns;
+ $$ = (Node *) n;
+ }
+ | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS ALL
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ PublicationObjSpec *obj = makeNode(PublicationObjSpec);
+ obj->pubobjtype = PUBLICATIONOBJ_TABLE;
+ obj->pubtable = makeNode(PublicationTable);
+ obj->pubtable->relation = $6;
+ obj->pubtable->columns = NIL;
+ n->pubname = $3;
+ n->pubobjects = list_make1(obj);
+ n->action = AP_SetColumns;
+ $$ = (Node *) n;
+ }
| ALTER PUBLICATION name DROP pub_obj_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
@@ -17444,6 +17484,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA ||
pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
{
+ /*
+ * This can happen if a column list is specified in a continuation
+ * for a schema entry; reject it.
+ */
+ if (pubobj->pubtable)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("column specification not allowed for schemas"),
+ parser_errposition(pubobj->location));
+
/*
* We can distinguish between the different type of schema
* objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..3428984130 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,11 @@
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel,
+ Bitmapset *columns);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
- HeapTuple tuple, bool binary);
+ HeapTuple tuple, bool binary,
+ Bitmapset *columns);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +400,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple newtuple, bool binary)
+ HeapTuple newtuple, bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@@ -410,7 +412,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newtuple, binary, columns);
}
/*
@@ -442,7 +444,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+ HeapTuple oldtuple, HeapTuple newtuple, bool binary,
+ Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@@ -463,11 +466,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldtuple, binary, columns);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newtuple, binary, columns);
}
/*
@@ -536,7 +539,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
}
/*
@@ -651,7 +654,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
* Write relation description to the output stream.
*/
void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
+ Bitmapset *columns)
{
char *relname;
@@ -673,7 +677,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
- logicalrep_write_attrs(out, rel);
+ logicalrep_write_attrs(out, rel, columns);
}
/*
@@ -749,7 +753,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple,
+ bool binary, Bitmapset *columns)
{
TupleDesc desc;
Datum values[MaxTupleAttributeNumber];
@@ -761,7 +766,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
for (i = 0; i < desc->natts; i++)
{
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ /* Don't count attributes that are not to be sent. */
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
continue;
nliveatts++;
}
@@ -783,6 +794,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
if (att->attisdropped || att->attgenerated)
continue;
+ /* Ignore attributes that are not to be sent. */
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
+ continue;
+
if (isnull[i])
{
pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@@ -904,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
* Write relation attribute metadata to the stream.
*/
static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
TupleDesc desc;
int i;
@@ -914,20 +929,24 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
desc = RelationGetDescr(rel);
- /* send number of live attributes */
- for (i = 0; i < desc->natts; i++)
- {
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
- continue;
- nliveatts++;
- }
- pq_sendint16(out, nliveatts);
-
/* fetch bitmap of REPLICATION IDENTITY attributes */
replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
if (!replidentfull)
idattrs = RelationGetIdentityKeyBitmap(rel);
+ /* send number of live attributes */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
+ continue;
+ nliveatts++;
+ }
+ pq_sendint16(out, nliveatts);
+
/* send the attributes */
for (i = 0; i < desc->natts; i++)
{
@@ -936,7 +955,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
if (att->attisdropped || att->attgenerated)
continue;
-
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
+ continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
if (replidentfull ||
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..35f1294ae4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -697,17 +698,20 @@ fetch_remote_table_info(char *nspname, char *relname,
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
bool isnull;
int natt;
+ ListCell *lc;
+ bool am_partition;
+ Bitmapset *included_cols = NULL;
lrel->nspname = nspname;
lrel->relname = relname;
/* First fetch Oid and replica identity. */
initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+ appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
" FROM pg_catalog.pg_class c"
" INNER JOIN pg_catalog.pg_namespace n"
" ON (c.relnamespace = n.oid)"
@@ -737,14 +741,19 @@ fetch_remote_table_info(char *nspname, char *relname,
Assert(!isnull);
lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
+ am_partition = DatumGetBool(slot_getattr(slot, 4, &isnull));
+ Assert(!isnull);
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- /* Now fetch columns. */
+ /*
+ * Now fetch column names and types.
+ */
resetStringInfo(&cmd);
appendStringInfo(&cmd,
- "SELECT a.attname,"
+ "SELECT a.attnum,"
+ " a.attname,"
" a.atttypid,"
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
@@ -772,16 +781,92 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
lrel->attkeys = NULL;
+ /*
+ * In server versions 15 and higher, obtain the applicable column filter,
+ * if any.
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ WalRcvExecResult *pubres;
+ TupleTableSlot *slot;
+ Oid attrsRow[] = {INT2OID};
+ StringInfoData publications;
+ bool first = true;
+
+ initStringInfo(&publications);
+ foreach(lc, MySubscription->publications)
+ {
+ if (!first)
+ appendStringInfo(&publications, ", ");
+ appendStringInfoString(&publications, quote_literal_cstr(strVal(lfirst(lc))));
+ first = false;
+ }
+
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ " SELECT pg_catalog.unnest(prattrs)\n"
+ " FROM pg_catalog.pg_publication p JOIN\n"
+ " pg_catalog.pg_publication_rel pr ON (p.oid = pr.prpubid)\n"
+ " WHERE p.pubname IN (%s) AND\n",
+ publications.data);
+ if (!am_partition)
+ appendStringInfo(&cmd, "prrelid = %u", lrel->remoteid);
+ else
+ appendStringInfo(&cmd,
+ "prrelid IN (SELECT relid\n"
+ " FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+ lrel->remoteid);
+
+ pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrsRow), attrsRow);
+
+ if (pubres->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch attribute info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, pubres->err)));
+
+ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+ {
+ AttrNumber attnum;
+
+ attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
+ if (isnull)
+ continue;
+ included_cols = bms_add_member(included_cols, attnum);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+ pfree(publications.data);
+ walrcv_clear_result(pubres);
+ }
+
+ /*
+ * Store the column names only if they are contained in column filter.
+ * LogicalRepRelation will only contain attributes corresponding to those
+ * specified in column filters.
+ */
natt = 0;
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
- lrel->attnames[natt] =
- TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ char *rel_colname;
+ AttrNumber attnum;
+
+ attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
- lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+ if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+ continue;
+
+ rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
- if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+ lrel->attnames[natt] = rel_colname;
+ lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
+
+ if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
/* Should never happen. */
@@ -791,12 +876,12 @@ fetch_remote_table_info(char *nspname, char *relname,
ExecClearTuple(slot);
}
+
ExecDropSingleTupleTableSlot(slot);
-
- lrel->natts = natt;
-
walrcv_clear_result(res);
pfree(cmd.data);
+
+ lrel->natts = natt;
}
/*
@@ -829,8 +914,17 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
if (lrel.relkind == RELKIND_RELATION)
- appendStringInfo(&cmd, "COPY %s TO STDOUT",
+ {
+ appendStringInfo(&cmd, "COPY %s (",
quote_qualified_identifier(lrel.nspname, lrel.relname));
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ if (i < lrel.natts - 1)
+ appendStringInfoString(&cmd, ", ");
+ }
+ appendStringInfo(&cmd, ") TO STDOUT");
+ }
else
{
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..34df5d4956 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
+#include "utils/builtins.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx);
+ LogicalDecodingContext *ctx,
+ Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
@@ -130,6 +134,13 @@ typedef struct RelationSyncEntry
* having identical TupleDesc.
*/
TupleConversionMap *map;
+
+ /*
+ * Set of columns included in the publication, or NULL if all columns are
+ * included implicitly. Note that the attnums in this list are not
+ * shifted by FirstLowInvalidHeapAttributeNumber.
+ */
+ Bitmapset *columns;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@@ -570,11 +581,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
}
MemoryContextSwitchTo(oldctx);
- send_relation_and_attrs(ancestor, xid, ctx);
+ send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, xid, ctx);
+ send_relation_and_attrs(relation, xid, ctx, relentry->columns);
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +598,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
*/
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx)
+ LogicalDecodingContext *ctx,
+ Bitmapset *columns)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
@@ -610,13 +622,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
if (att->atttypid < FirstGenbkiObjectId)
continue;
+ /* Skip if attribute is not present in column filter. */
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
+ continue;
+
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, xid, relation);
+ logicalrep_write_rel(ctx->out, xid, relation, columns);
OutputPluginWrite(ctx, false);
}
@@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, xid, relation, tuple,
- data->binary);
+ data->binary, relentry->columns);
OutputPluginWrite(ctx, true);
break;
}
@@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, xid, relation, oldtuple,
- newtuple, data->binary);
+ newtuple, data->binary, relentry->columns);
OutputPluginWrite(ctx, true);
break;
}
@@ -1122,6 +1138,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
bool found;
+ Oid ancestor_id;
MemoryContext oldctx;
Assert(RelationSyncCache != NULL);
@@ -1142,6 +1159,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->publish_as_relid = InvalidOid;
+ entry->columns = NULL;
entry->map = NULL; /* will be set by maybe_send_schema() if
* needed */
}
@@ -1182,6 +1200,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
{
Publication *pub = lfirst(lc);
bool publish = false;
+ bool ancestor_published = false;
if (pub->alltables)
{
@@ -1192,8 +1211,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
if (!publish)
{
- bool ancestor_published = false;
-
/*
* For a partition, check if any of the ancestors are
* published. If so, note down the topmost ancestor that is
@@ -1219,6 +1236,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
pub->oid))
{
ancestor_published = true;
+ ancestor_id = ancestor;
if (pub->pubviaroot)
publish_as_relid = ancestor;
}
@@ -1239,15 +1257,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
if (publish &&
(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
+ Oid relid;
+ HeapTuple pub_rel_tuple;
+
+ relid = ancestor_published ? ancestor_id : publish_as_relid;
+ pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(pub_rel_tuple))
+ {
+ Datum pub_rel_cols;
+ bool isnull;
+
+ pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+ pub_rel_tuple,
+ Anum_pg_publication_rel_prattrs,
+ &isnull);
+ if (!isnull)
+ {
+ ArrayType *arr;
+ int nelems;
+ int16 *elems;
+
+ arr = DatumGetArrayTypeP(pub_rel_cols);
+ nelems = ARR_DIMS(arr)[0];
+ elems = (int16 *) ARR_DATA_PTR(arr);
+
+ /* XXX is there a danger of memory leak here? beware */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ for (int i = 0; i < nelems; i++)
+ entry->columns = bms_add_member(entry->columns,
+ elems[i]);
+ MemoryContextSwitchTo(oldctx);
+ }
+ ReleaseSysCache(pub_rel_tuple);
+ }
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
}
-
- if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
- break;
}
list_free(pubids);
@@ -1343,6 +1393,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
if (entry->map)
{
/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7af6dfa575..c7c5f3de66 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4055,6 +4055,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
int i_oid;
int i_prpubid;
int i_prrelid;
+ int i_prattrs;
int i,
j,
ntups;
@@ -4066,8 +4067,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
/* Collect all publication membership info. */
appendPQExpBufferStr(query,
- "SELECT tableoid, oid, prpubid, prrelid "
- "FROM pg_catalog.pg_publication_rel");
+ "SELECT tableoid, oid, prpubid, prrelid");
+ if (fout->remoteVersion >= 150000)
+ appendPQExpBufferStr(query, ", prattrs");
+ else
+ appendPQExpBufferStr(query, ", NULL as prattrs");
+ appendPQExpBufferStr(query,
+ " FROM pg_catalog.pg_publication_rel");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
@@ -4076,6 +4082,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
i_oid = PQfnumber(res, "oid");
i_prpubid = PQfnumber(res, "prpubid");
i_prrelid = PQfnumber(res, "prrelid");
+ i_prattrs = PQfnumber(res, "prattrs");
/* this allocation may be more than we need */
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4117,6 +4124,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
pubrinfo[j].publication = pubinfo;
pubrinfo[j].pubtable = tbinfo;
+ if (!PQgetisnull(res, i, i_prattrs))
+ {
+ char **attnames;
+ int nattnames;
+ PQExpBuffer attribs;
+
+ if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+ &attnames, &nattnames))
+ fatal("could not parse %s array", "prattrs");
+ attribs = createPQExpBuffer();
+ for (int k = 0; k < nattnames; k++)
+ {
+ if (k > 0)
+ appendPQExpBufferStr(attribs, ", ");
+
+ appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+ }
+ pubrinfo[i].pubrattrs = attribs->data;
+ }
+ else
+ pubrinfo[j].pubrattrs = NULL;
+
/* Decide whether we want to dump it */
selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
@@ -4191,10 +4220,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
query = createPQExpBuffer();
- appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+ appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
fmtId(pubinfo->dobj.name));
- appendPQExpBuffer(query, " %s;\n",
- fmtQualifiedDumpable(tbinfo));
+ appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+ if (pubrinfo->pubrattrs)
+ appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+ appendPQExpBufferStr(query, ";\n");
/*
* There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index f9deb321ac..f3d3689ac9 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo
DumpableObject dobj;
PublicationInfo *publication;
TableInfo *pubtable;
+ char *pubrattrs;
} PublicationRelInfo;
/*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c28788e84f..e9c2650b49 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5815,7 +5815,7 @@ listPublications(const char *pattern)
*/
static bool
addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
- bool singlecol, printTableContent *cont)
+ bool as_schema, printTableContent *cont)
{
PGresult *res;
int count = 0;
@@ -5832,10 +5832,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
for (i = 0; i < count; i++)
{
- if (!singlecol)
+ if (!as_schema) /* as table */
+ {
printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1));
- else
+ if (!PQgetisnull(res, i, 2))
+ appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+ }
+ else /* as schema */
printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
printTableAddFooter(cont, buf->data);
@@ -5963,8 +5967,20 @@ describePublications(const char *pattern)
{
/* Get the tables for the specified publication */
printfPQExpBuffer(&buf,
- "SELECT n.nspname, c.relname\n"
- "FROM pg_catalog.pg_class c,\n"
+ "SELECT n.nspname, c.relname, \n");
+ if (pset.sversion >= 150000)
+ appendPQExpBufferStr(&buf,
+ " CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+ " pg_catalog.array_to_string"
+ "(ARRAY(SELECT attname\n"
+ " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
+ " pg_catalog.pg_attribute\n"
+ " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+ " ELSE NULL END AS columns");
+ else
+ appendPQExpBufferStr(&buf, "NULL as columns");
+ appendPQExpBuffer(&buf,
+ "\nFROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
" pg_catalog.pg_publication_rel pr\n"
"WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index cf30239f6d..25c7c08040 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
/* ALTER PUBLICATION <name> ADD */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+ COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
/* ALTER PUBLICATION <name> DROP */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..edd4f0c63c 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
typedef struct PublicationRelInfo
{
Relation relation;
+ List *columns;
} PublicationRelInfo;
extern Publication *GetPublication(Oid pubid);
@@ -109,6 +110,8 @@ typedef enum PublicationPartOpt
} PublicationPartOpt;
extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetRelationColumnPartialPublications(Oid relid);
+extern List *GetRelationColumnListInPublication(Oid relid, Oid pubid);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
extern List *GetPublicationSchemas(Oid pubid);
@@ -127,6 +130,8 @@ extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *tar
bool if_not_exists);
extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
bool if_not_exists);
+extern void publication_set_table_columns(Relation pubrel, HeapTuple pubreltup,
+ Relation targetrel, List *columns);
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..51946cce59 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
Oid oid; /* oid */
Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */
+#ifdef CATALOG_VARLEN /* variable-length fields start here */
+ int2vector prattrs;
+#endif
} FormData_pg_publication_rel;
/* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 593e301f7a..f98a78c016 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
{
NodeTag type;
RangeVar *relation; /* relation to be published */
+ List *columns; /* List of columns in a publication table */
} PublicationTable;
/*
@@ -3678,7 +3679,8 @@ typedef enum AlterPublicationAction
{
AP_AddObjects, /* add objects to publication */
AP_DropObjects, /* remove objects from publication */
- AP_SetObjects /* set list of objects */
+ AP_SetObjects, /* set list of objects */
+ AP_SetColumns /* change list of columns for a table */
} AlterPublicationAction;
typedef struct AlterPublicationStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..7a5cb9871d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
Relation rel, HeapTuple newtuple,
- bool binary);
+ bool binary, Bitmapset *columns);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
- HeapTuple newtuple, bool binary);
+ HeapTuple newtuple, bool binary, Bitmapset *columns);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
- Relation rel);
+ Relation rel, Bitmapset *columns);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 12c5f67080..44de5fa8a2 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,29 @@ Publications:
regress_publication_user | t | t | t | f | f | f
(1 row)
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error
+ERROR: column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error
+ERROR: invalid column list for publishing relation "testpub_tbl5"
+DETAIL: All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ERROR: cannot drop column "c" because it is part of publication "testpub_fortable"
+HINT: Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication.
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a);
+ERROR: column list must not be specified in ALTER PUBLICATION ... DROP
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error
+ERROR: invalid column list for publishing relation "testpub_tbl6"
+DETAIL: Cannot have column filter on relations with REPLICA IDENTITY FULL.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+ALTER PUBLICATION testpub_fortable
+ ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c); -- error
+ERROR: cannot change column set for relation "testpub_tbl6"
+DETAIL: Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
CREATE TABLE testpub_tbl3 (a int);
CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -670,6 +692,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
Tables from schemas:
"pub_test1"
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR: syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR: column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+ ^
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..b625d161cb 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,21 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
\d+ testpub_tbl2
\dRp+ testpub_foralltables
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a);
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+ALTER PUBLICATION testpub_fortable
+ ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c); -- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +376,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
\dRp+ testpub1_forschema
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/028_column_list.pl b/src/test/subscription/t/028_column_list.pl
new file mode 100644
index 0000000000..f2b26176ed
--- /dev/null
+++ b/src/test/subscription/t/028_column_list.pl
@@ -0,0 +1,164 @@
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Test partial-column publication of tables
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int);
+ INSERT INTO tab2 VALUES (2, 'foo', 2);");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+ "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column tab1.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column tab3.b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column tab1.c is not replicated');
+
+# Verify user-defined types
+$node_publisher->safe_psql('postgres',
+ qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+ CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, c int, d text);
+ ALTER PUBLICATION pub1 ADD TABLE test_tab4 (a, b, d);
+ });
+$node_subscriber->safe_psql('postgres',
+ qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+ CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, d text);
+ });
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab4 VALUES (1, 'red', 3, 'oh my');");
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1,'abc',3)");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab2 SET c = 5 where a = 2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab2 WHERE a = 1");
+is($result, qq(1|abc), 'insert on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab2 WHERE a = 2");
+is($result, qq(2|foo), 'update on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM test_tab4");
+is($result, qq(1|red|oh my), 'insert on table with user-defined type');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d)");
+$node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5;"),
+ qq(1|11|1111
+2|22|2222),
+ 'overlapping publications with overlapping column lists');
--
2.30.2