Hmm, I messed up the patch file I sent. Here's the complete patch.
--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
"Doing what he did amounts to sticking his fingers under the hood of the
implementation; if he gets his fingers burnt, it's his problem." (Tom Lane)
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..c86055b93c 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@@ -110,6 +110,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
+ Optionally, a column list can be specified. See <xref
+ linkend="sql-createpublication"/> for details.
</para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
publication, so they are never explicitly added to the publication.
</para>
+ <para>
+ When a column list is specified, only the listed columns are replicated;
+ any other columns are ignored for the purpose of replication through
+ this publication. If no column list is specified, all columns of the
+ table are replicated through this publication, including any columns
+ added later. If a column list is specified, it must include the replica
+ identity columns.
+ </para>
+
<para>
Only persistent base tables and partitioned tables can be part of a
publication. Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index fe9c714257..a88d12e8ae 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -1472,7 +1472,7 @@ doDeletion(const ObjectAddress *object, int flags)
break;
case OCLASS_PUBLICATION_REL:
- RemovePublicationRelById(object->objectId);
+ RemovePublicationRelById(object->objectId, object->objectSubId);
break;
case OCLASS_PUBLICATION:
@@ -2754,8 +2754,12 @@ free_object_addresses(ObjectAddresses *addrs)
ObjectClass
getObjectClass(const ObjectAddress *object)
{
- /* only pg_class entries can have nonzero objectSubId */
+ /*
+ * only pg_class and pg_publication_rel entries can have nonzero
+ * objectSubId
+ */
if (object->classId != RelationRelationId &&
+ object->classId != PublicationRelRelationId &&
object->objectSubId != 0)
elog(ERROR, "invalid non-zero objectSubId for object class %u",
object->classId);
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 2bae3fbb17..5eed248dcb 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -4019,6 +4019,7 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
/* translator: first %s is, e.g., "table %s" */
appendStringInfo(&buffer, _("publication of %s in publication %s"),
rel.data, pubname);
+ /* FIXME add objectSubId support */
pfree(rel.data);
ReleaseSysCache(tup);
break;
@@ -5853,9 +5854,16 @@ getObjectIdentityParts(const ObjectAddress *object,
getRelationIdentity(&buffer, prform->prrelid, objname, false);
appendStringInfo(&buffer, " in publication %s", pubname);
+ if (object->objectSubId) /* FIXME maybe get_attname */
+ appendStringInfo(&buffer, " column %d", object->objectSubId);
if (objargs)
+ {
*objargs = list_make1(pubname);
+ if (object->objectSubId)
+ *objargs = lappend(*objargs,
+ psprintf("%d", object->objectSubId));
+ }
ReleaseSysCache(tup);
break;
diff --git a/src/backend/catalog/pg_depend.c b/src/backend/catalog/pg_depend.c
index 5f37bf6d10..dfcb450e61 100644
--- a/src/backend/catalog/pg_depend.c
+++ b/src/backend/catalog/pg_depend.c
@@ -658,6 +658,56 @@ isObjectPinned(const ObjectAddress *object)
* Various special-purpose lookups and manipulations of pg_depend.
*/
+/*
+ * Find all objects of the given class that reference the specified object,
+ * and add them to the given ObjectAddresses.
+ */
+void
+findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+ Oid refclassId, Oid refobjectId, int32 refobjsubId)
+{
+ Relation depRel;
+ ScanKeyData key[3];
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ depRel = table_open(DependRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_depend_refclassid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(refclassId));
+ ScanKeyInit(&key[1],
+ Anum_pg_depend_refobjid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(refobjectId));
+ ScanKeyInit(&key[2],
+ Anum_pg_depend_refobjsubid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(refobjsubId));
+
+ scan = systable_beginscan(depRel, DependReferenceIndexId, true,
+ NULL, 3, key);
+
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup);
+ ObjectAddress object;
+
+ if (depform->classid != classId)
+ continue;
+
+ ObjectAddressSubSet(object, depform->classid, depform->objid,
+ depform->refobjsubid);
+
+ add_exact_object_address(&object, addrs);
+ }
+
+ systable_endscan(scan);
+
+ table_close(depRel, AccessShareLock);
+}
+
/*
* Find the extension containing the specified object, if any
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 62f10bcbd2..ae58adc8e5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -46,12 +46,18 @@
#include "utils/syscache.h"
/*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter
+ * (shifted by FirstLowInvalidHeapAttributeNumber), or NULL if no column
+ * filter was specified.
*/
static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
{
+ bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
/* Must be a regular or partitioned table */
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +88,40 @@ check_publication_add_relation(Relation targetrel)
errmsg("cannot add relation \"%s\" to publication",
RelationGetRelationName(targetrel)),
errdetail("This operation is not supported for unlogged tables.")));
+
+ /*
+ * Enforce that the column filter can only leave out columns that aren't
+ * forced to be sent.
+ *
+ * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+ * columns need to be sent regardless); and in other cases, the columns in
+ * the REPLICA IDENTITY cannot be left out.
+ */
+ if (columns != NULL)
+ {
+ if (replidentfull)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("invalid column list for publishing relation \"%s\"",
+ RelationGetRelationName(targetrel)),
+ errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+ else
+ {
+ Bitmapset *idattrs;
+
+ idattrs = RelationGetIndexAttrBitmap(targetrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ if (!bms_is_subset(idattrs, columns))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("invalid column list for publishing relation \"%s\"",
+ RelationGetRelationName(targetrel)),
+ errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+
+ if (idattrs)
+ pfree(idattrs);
+ }
+ }
}
/*
@@ -289,9 +329,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
Oid relid = RelationGetRelid(targetrel->relation);
Oid prrelid;
Publication *pub = GetPublication(pubid);
+ Bitmapset *attmap = NULL;
+ AttrNumber *attarray;
+ int natts = 0;
+ int attnum;
ObjectAddress myself,
referenced;
List *relids = NIL;
+ ListCell *lc;
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -305,6 +350,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
{
table_close(rel, RowExclusiveLock);
+ /* FIXME need to handle the case of different column list */
+
if (if_not_exists)
return InvalidObjectAddress;
@@ -314,7 +361,34 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
RelationGetRelationName(targetrel->relation), pub->name)));
}
- check_publication_add_relation(targetrel->relation);
+ attarray = palloc(sizeof(AttrNumber) * list_length(targetrel->columns));
+ foreach(lc, targetrel->columns)
+ {
+ char *colname = strVal(lfirst(lc));
+ AttrNumber attnum = get_attnum(relid, colname);
+
+ if (attnum == InvalidAttrNumber)
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colname, RelationGetRelationName(targetrel->relation)));
+ if (attnum < 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot reference system column \"%s\" in publication column list",
+ colname));
+
+ if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, attmap))
+ ereport(ERROR,
+ errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("column \"%s\" specified twice in publication column list",
+ colname));
+
+ attmap = bms_add_member(attmap, attnum - FirstLowInvalidHeapAttributeNumber);
+ attarray[natts++] = attnum;
+ }
+
+ check_publication_add_relation(targetrel->relation, attmap);
/* Form a tuple. */
memset(values, 0, sizeof(values));
@@ -327,6 +401,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
ObjectIdGetDatum(pubid);
values[Anum_pg_publication_rel_prrelid - 1] =
ObjectIdGetDatum(relid);
+ if (targetrel->columns)
+ {
+ int2vector *prattrs;
+
+ prattrs = buildint2vector(attarray, natts);
+ values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+ }
+ else
+ nulls[Anum_pg_publication_rel_prattrs - 1] = true;
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -344,6 +427,21 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
ObjectAddressSet(referenced, RelationRelationId, relid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+ /*
+ * If there's an explicit column list, make one dependency entry for each
+ * column. Note that the referencing side of the dependency is also
+ * specific to one column, so that it can be dropped separately if the
+ * column is dropped.
+ */
+ while ((attnum = bms_first_member(attmap)) >= 0)
+ {
+ ObjectAddressSubSet(referenced, RelationRelationId, relid,
+ attnum + FirstLowInvalidHeapAttributeNumber);
+ myself.objectSubId = attnum + FirstLowInvalidHeapAttributeNumber;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+ myself.objectSubId = 0; /* need to undo this bit */
+
/* Close the table. */
table_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 404bb5d0c8..a070914bdd 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -561,7 +561,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
pubrel = palloc(sizeof(PublicationRelInfo));
pubrel->relation = oldrel;
-
+ pubrel->columns = NIL;
delrels = lappend(delrels, pubrel);
}
}
@@ -757,10 +757,11 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
}
/*
- * Remove relation from publication by mapping OID.
+ * Remove relation from publication by mapping OID, or publication status
+ * of one column of that relation in the publication if an attnum is given.
*/
void
-RemovePublicationRelById(Oid proid)
+RemovePublicationRelById(Oid proid, int32 attnum)
{
Relation rel;
HeapTuple tup;
@@ -790,7 +791,81 @@ RemovePublicationRelById(Oid proid)
InvalidatePublicationRels(relids);
- CatalogTupleDelete(rel, &tup->t_self);
+ /*
+ * If no column is given, simply delete the relation from the publication.
+ *
+ * If a column is given, what we do instead is to remove that column from
+ * the column list. The relation remains in the publication, with the
+ * other columns. However, dropping the last column is disallowed.
+ */
+ if (attnum == 0)
+ {
+ CatalogTupleDelete(rel, &tup->t_self);
+ }
+ else
+ {
+ Datum adatum;
+ ArrayType *arr;
+ int nelems;
+ int16 *elems;
+ int16 *newelems;
+ int2vector *newvec;
+ Datum values[Natts_pg_publication_rel];
+ bool nulls[Natts_pg_publication_rel];
+ bool replace[Natts_pg_publication_rel];
+ HeapTuple newtup;
+ int i,
+ j;
+ bool isnull;
+
+ /* Obtain the original column list */
+ adatum = SysCacheGetAttr(PUBLICATIONRELMAP,
+ tup,
+ Anum_pg_publication_rel_prattrs,
+ &isnull);
+ if (isnull) /* shouldn't happen */
+ elog(ERROR, "can't drop column from publication without a column list");
+ arr = DatumGetArrayTypeP(adatum);
+ nelems = ARR_DIMS(arr)[0];
+ elems = (int16 *) ARR_DATA_PTR(arr);
+
+ /* Construct a list excluding the given column */
+ newelems = palloc(sizeof(int16) * nelems - 1);
+ for (i = 0, j = 0; i < nelems - 1; i++)
+ {
+ if (elems[i] == attnum)
+ continue;
+ newelems[j++] = elems[i];
+ }
+
+ /*
+ * If this is the last column used in the publication, disallow the
+ * command. We could alternatively just drop the relation from the
+ * publication.
+ */
+ if (j == 0)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot drop the last column in publication \"%s\"",
+ get_publication_name(pubrel->prpubid, false)),
+ errhint("Remove table \"%s\" from the publication first.",
+ get_rel_name(pubrel->prrelid)));
+ }
+
+ /* Build the updated tuple */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, false, sizeof(nulls));
+ MemSet(replace, false, sizeof(replace));
+ newvec = buildint2vector(newelems, j);
+ values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(newvec);
+ replace[Anum_pg_publication_rel_prattrs - 1] = true;
+
+ /* Execute the update */
+ newtup = heap_modify_tuple(tup, RelationGetDescr(rel),
+ values, nulls, replace);
+ CatalogTupleUpdate(rel, &tup->t_self, newtup);
+ }
ReleaseSysCache(tup);
@@ -932,6 +1007,8 @@ OpenTableList(List *tables)
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
+ pub_rel->columns = t->columns;
+
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, myrelid);
@@ -965,8 +1042,11 @@ OpenTableList(List *tables)
/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
+
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
+ pub_rel->columns = t->columns;
+
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, childrelid);
}
@@ -1074,6 +1154,12 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
Relation rel = pubrel->relation;
Oid relid = RelationGetRelid(rel);
+ if (pubrel->columns)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("column list may not be specified for relation \"%s\" in ALTER PUBLICATION ... SET/DROP command",
+ RelationGetRelationName(pubrel->relation)));
+
prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 47b29001d5..7207dcf9c0 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,8 +40,9 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
-#include "catalog/pg_tablespace.h"
+#include "catalog/pg_publication_rel.h"
#include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
@@ -8420,6 +8421,13 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
ReleaseSysCache(tuple);
+ /*
+ * If the column is part of a replication column list, arrange to get that
+ * removed too.
+ */
+ findAndAddAddresses(addrs, PublicationRelRelationId,
+ RelationRelationId, RelationGetRelid(rel), attnum);
+
/*
* Propagate to children as appropriate. Unlike most other ALTER
* routines, we have to do this one level of recursion at a time; we can't
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
PublicationTable *newnode = makeNode(PublicationTable);
COPY_NODE_FIELD(relation);
+ COPY_NODE_FIELD(columns);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
+ COMPARE_NODE_FIELD(columns);
return true;
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..4dad6fedfb 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9742,12 +9742,13 @@ CreatePublicationStmt:
* relation_expr here.
*/
PublicationObjSpec:
- TABLE relation_expr
+ TABLE relation_expr opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_TABLE;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $2;
+ $$->pubtable->columns = $3;
}
| ALL TABLES IN_P SCHEMA ColId
{
@@ -9762,28 +9763,38 @@ PublicationObjSpec:
$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
$$->location = @5;
}
- | ColId
+ | ColId opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
- $$->name = $1;
+ if ($2 != NULL)
+ {
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+ $$->pubtable->columns = $2;
+ $$->name = NULL;
+ }
+ else
+ $$->name = $1;
$$->location = @1;
}
- | ColId indirection
+ | ColId indirection opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+ $$->pubtable->columns = $3;
$$->location = @1;
}
/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
- | extended_relation_expr
+ | extended_relation_expr opt_column_list
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $1;
+ $$->pubtable->columns = $2;
}
| CURRENT_SCHEMA
{
@@ -17435,8 +17446,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
{
/* convert it to PublicationTable */
PublicationTable *pubtable = makeNode(PublicationTable);
- pubtable->relation = makeRangeVar(NULL, pubobj->name,
- pubobj->location);
+
+ pubtable->relation =
+ makeRangeVar(NULL, pubobj->name, pubobj->location);
pubobj->pubtable = pubtable;
pubobj->name = NULL;
}
@@ -17444,6 +17456,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
{
+ /*
+ * This can happen if a column list is specified in a continuation
+ * for a schema entry; reject it.
+ */
+ if (pubobj->pubtable)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("column specification not allowed for schemas"),
+ parser_errposition(pubobj->location));
+
/*
* We can distinguish between the different type of schema
* objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..15d8192238 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,9 @@
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
- HeapTuple tuple, bool binary);
+ HeapTuple tuple, bool binary, Bitmapset *att_map);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple newtuple, bool binary)
+ HeapTuple newtuple, bool binary, Bitmapset *att_map)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
}
/*
@@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+ HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldtuple, binary, att_map);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
}
/*
@@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
}
/*
@@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
* Write relation description to the output stream.
*/
void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map)
{
char *relname;
@@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
- logicalrep_write_attrs(out, rel);
+ logicalrep_write_attrs(out, rel, att_map);
}
/*
@@ -749,20 +749,42 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary,
+ Bitmapset *att_map)
{
TupleDesc desc;
Datum values[MaxTupleAttributeNumber];
bool isnull[MaxTupleAttributeNumber];
int i;
uint16 nliveatts = 0;
+ Bitmapset *idattrs = NULL;
+ bool replidentfull;
+ Form_pg_attribute att;
desc = RelationGetDescr(rel);
+ replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+ if (!replidentfull)
+ idattrs = RelationGetIdentityKeyBitmap(rel);
+
for (i = 0; i < desc->natts; i++)
{
+ att = TupleDescAttr(desc, i);
if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
continue;
+
+ /*
+ * Do not increment count of attributes if not a part of column
+ * filters except for replica identity columns or if replica identity
+ * is full.
+ */
+ if (att_map != NULL &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ att_map) &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ idattrs) &&
+ !replidentfull)
+ continue;
nliveatts++;
}
pq_sendint16(out, nliveatts);
@@ -800,6 +822,19 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
continue;
}
+ /*
+ * Do not send attribute data if it is not a part of column filters,
+ * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is
+ * full, send the data.
+ */
+ if (att_map != NULL &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ att_map) &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ idattrs) &&
+ !replidentfull)
+ continue;
+
typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
if (!HeapTupleIsValid(typtup))
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
@@ -904,7 +939,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
* Write relation attribute metadata to the stream.
*/
static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map)
{
TupleDesc desc;
int i;
@@ -914,20 +949,35 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
desc = RelationGetDescr(rel);
- /* send number of live attributes */
- for (i = 0; i < desc->natts; i++)
- {
- if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
- continue;
- nliveatts++;
- }
- pq_sendint16(out, nliveatts);
-
/* fetch bitmap of REPLICATION IDENTITY attributes */
replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
if (!replidentfull)
idattrs = RelationGetIdentityKeyBitmap(rel);
+ /* send number of live attributes */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+ /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
+ if (replidentfull ||
+ bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ idattrs))
+ {
+ nliveatts++;
+ continue;
+ }
+ /* Skip sending if not a part of column filter */
+ if (att_map != NULL &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ att_map))
+ continue;
+ nliveatts++;
+ }
+ pq_sendint16(out, nliveatts);
+
/* send the attributes */
for (i = 0; i < desc->natts; i++)
{
@@ -937,6 +987,17 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
if (att->attisdropped || att->attgenerated)
continue;
+ /*
+ * Exclude filtered columns, but REPLICA IDENTITY columns can't be
+ * excluded
+ */
+ if (att_map != NULL &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ att_map) &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ idattrs)
+ && !replidentfull)
+ continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
if (replidentfull ||
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..15902faf56 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -695,19 +696,25 @@ fetch_remote_table_info(char *nspname, char *relname,
LogicalRepRelation *lrel)
{
WalRcvExecResult *res;
+ WalRcvExecResult *res_pub;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
+ TupleTableSlot *slot_pub;
+ Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid pubRow[] = {TEXTARRAYOID};
bool isnull;
int natt;
+ List *pub_columns = NIL;
+ ListCell *lc;
+ bool am_partition = false;
lrel->nspname = nspname;
lrel->relname = relname;
/* First fetch Oid and replica identity. */
initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+ appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
" FROM pg_catalog.pg_class c"
" INNER JOIN pg_catalog.pg_namespace n"
" ON (c.relnamespace = n.oid)"
@@ -737,6 +744,7 @@ fetch_remote_table_info(char *nspname, char *relname,
Assert(!isnull);
lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
+ am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
@@ -774,11 +782,101 @@ fetch_remote_table_info(char *nspname, char *relname,
natt = 0;
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+
+ /*
+ * Now, fetch the values of publications' column filters.
+ *
+ * For a partition, use pg_inherit to find the parent, as the
+ * pg_publication_rel contains only the topmost parent table entry in case
+ * the table is partitioned. Run a recursive query to iterate through all
+ * the parents of the partition and retreive the record for the parent
+ * that exists in pg_publication_rel.
+ */
+ resetStringInfo(&cmd);
+ appendStringInfoString(&cmd,
+ "SELECT CASE WHEN prattrs IS NOT NULL THEN\n"
+ " ARRAY(SELECT attname\n"
+ " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(prattrs::int[], 1)) s,\n"
+ " pg_catalog.pg_attribute\n"
+ " WHERE attrelid = prrelid AND attnum = prattrs[s])\n"
+ " ELSE NULL END AS columns\n"
+ "FROM pg_catalog.pg_publication_rel\n");
+ if (!am_partition)
+ appendStringInfo(&cmd, "WHERE prrelid = %u", lrel->remoteid);
+ else
+ appendStringInfo(&cmd,
+ "WHERE prrelid IN (SELECT relid \n"
+ "FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+ lrel->remoteid);
+
+ res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(pubRow), pubRow);
+
+ if (res_pub->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res_pub->err)));
+ slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple);
+
+ while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub))
+ {
+ Datum adatum;
+ Datum *elems;
+ bool *nulls;
+ int nelems;
+
+ adatum = slot_getattr(slot_pub, 1, &isnull);
+ if (isnull) /* shouldn't happen */
+ elog(ERROR, "unexpected null value in publication column filter");
+ deconstruct_array(DatumGetArrayTypeP(adatum),
+ TEXTOID, -1, false, TYPALIGN_INT,
+ &elems, &nulls, &nelems);
+ for (int i = 0; i < nelems; i++)
+ {
+ if (nulls[i]) /* shouldn't happen */
+ elog(ERROR, "unexpected null value in publication column filter");
+ pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i]));
+ }
+ ExecClearTuple(slot_pub);
+ }
+ ExecDropSingleTupleTableSlot(slot_pub);
+ walrcv_clear_result(res_pub);
+
+ /*
+ * Store the column names only if they are contained in column filter
+ * LogicalRepRelation will only contain attributes corresponding to those
+ * specficied in column filters.
+ */
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
- lrel->attnames[natt] =
- TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ char *rel_colname;
+ bool found = false;
+
+ rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
+ if (pub_columns != NIL)
+ {
+ foreach(lc, pub_columns)
+ {
+ char *pub_colname = lfirst(lc);
+
+ if (!strcmp(pub_colname, rel_colname))
+ {
+ found = true;
+ lrel->attnames[natt] = rel_colname;
+ break;
+ }
+ }
+ }
+ else
+ {
+ found = true;
+ lrel->attnames[natt] = rel_colname;
+ }
+ if (!found)
+ continue;
+
lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
@@ -829,8 +927,17 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
if (lrel.relkind == RELKIND_RELATION)
- appendStringInfo(&cmd, "COPY %s TO STDOUT",
+ {
+ appendStringInfo(&cmd, "COPY %s (",
quote_qualified_identifier(lrel.nspname, lrel.relname));
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ if (i < lrel.natts - 1)
+ appendStringInfoString(&cmd, ", ");
+ }
+ appendStringInfo(&cmd, ") TO STDOUT");
+ }
else
{
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..f9f9ecd0c0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
+#include "utils/builtins.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx);
+ LogicalDecodingContext *ctx,
+ Bitmapset *att_map);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
@@ -130,6 +134,7 @@ typedef struct RelationSyncEntry
* having identical TupleDesc.
*/
TupleConversionMap *map;
+ Bitmapset *att_map;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
}
MemoryContextSwitchTo(oldctx);
- send_relation_and_attrs(ancestor, xid, ctx);
+ send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, xid, ctx);
+ send_relation_and_attrs(relation, xid, ctx, relentry->att_map);
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
*/
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx)
+ LogicalDecodingContext *ctx,
+ Bitmapset *att_map)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
@@ -610,13 +616,25 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
if (att->atttypid < FirstGenbkiObjectId)
continue;
+ /*
+ * Do not send type information if attribute is not present in column
+ * filter. XXX Allow sending type information for REPLICA IDENTITY
+ * COLUMNS with user created type. even when they are not mentioned in
+ * column filters.
+ *
+ * FIXME -- this code seems not verified by tests.
+ */
+ if (att_map != NULL &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ att_map))
+ continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, xid, relation);
+ logicalrep_write_rel(ctx->out, xid, relation, att_map);
OutputPluginWrite(ctx, false);
}
@@ -693,7 +711,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, xid, relation, tuple,
- data->binary);
+ data->binary, relentry->att_map);
OutputPluginWrite(ctx, true);
break;
}
@@ -722,7 +740,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, xid, relation, oldtuple,
- newtuple, data->binary);
+ newtuple, data->binary, relentry->att_map);
OutputPluginWrite(ctx, true);
break;
}
@@ -1122,6 +1140,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
bool found;
+ Oid ancestor_id;
MemoryContext oldctx;
Assert(RelationSyncCache != NULL);
@@ -1142,6 +1161,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->publish_as_relid = InvalidOid;
+ entry->att_map = NULL;
entry->map = NULL; /* will be set by maybe_send_schema() if
* needed */
}
@@ -1182,6 +1202,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
{
Publication *pub = lfirst(lc);
bool publish = false;
+ bool ancestor_published = false;
if (pub->alltables)
{
@@ -1192,8 +1213,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
if (!publish)
{
- bool ancestor_published = false;
-
/*
* For a partition, check if any of the ancestors are
* published. If so, note down the topmost ancestor that is
@@ -1219,6 +1238,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
pub->oid))
{
ancestor_published = true;
+ ancestor_id = ancestor;
if (pub->pubviaroot)
publish_as_relid = ancestor;
}
@@ -1239,15 +1259,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
if (publish &&
(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
+ Oid relid;
+ HeapTuple pub_rel_tuple;
+
+ relid = ancestor_published ? ancestor_id : publish_as_relid;
+ pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(pub_rel_tuple))
+ {
+ Datum pub_rel_cols;
+ bool isnull;
+
+ pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+ pub_rel_tuple,
+ Anum_pg_publication_rel_prattrs,
+ &isnull);
+ if (!isnull)
+ {
+ ArrayType *arr;
+ int nelems;
+ int16 *elems;
+
+ arr = DatumGetArrayTypeP(pub_rel_cols);
+ nelems = ARR_DIMS(arr)[0];
+ elems = (int16 *) ARR_DATA_PTR(arr);
+
+ /* XXX is there a danger of memory leak here? beware */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ for (int i = 0; i < nelems; i++)
+ entry->att_map = bms_add_member(entry->att_map,
+ elems[i] - FirstLowInvalidHeapAttributeNumber);
+ MemoryContextSwitchTo(oldctx);
+ }
+ ReleaseSysCache(pub_rel_tuple);
+ }
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
}
-
- if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
- break;
}
list_free(pubids);
@@ -1343,6 +1395,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
+ bms_free(entry->att_map);
+ entry->att_map = NULL;
if (entry->map)
{
/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 10a86f9810..0c438481dc 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4265,6 +4265,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
int i_oid;
int i_prpubid;
int i_prrelid;
+ int i_prattrs;
int i,
j,
ntups;
@@ -4276,8 +4277,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
/* Collect all publication membership info. */
appendPQExpBufferStr(query,
- "SELECT tableoid, oid, prpubid, prrelid "
- "FROM pg_catalog.pg_publication_rel");
+ "SELECT tableoid, oid, prpubid, prrelid");
+ if (fout->remoteVersion >= 150000)
+ appendPQExpBufferStr(query, ", prattrs");
+ else
+ appendPQExpBufferStr(query, ", NULL as prattrs");
+ appendPQExpBufferStr(query,
+ " FROM pg_catalog.pg_publication_rel");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
@@ -4286,6 +4292,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
i_oid = PQfnumber(res, "oid");
i_prpubid = PQfnumber(res, "prpubid");
i_prrelid = PQfnumber(res, "prrelid");
+ i_prattrs = PQfnumber(res, "prattrs");
/* this allocation may be more than we need */
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4327,6 +4334,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
pubrinfo[j].publication = pubinfo;
pubrinfo[j].pubtable = tbinfo;
+ if (!PQgetisnull(res, i, i_prattrs))
+ {
+ char **attnames;
+ int nattnames;
+ PQExpBuffer attribs;
+
+ if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+ &attnames, &nattnames))
+ fatal("could not parse %s array", "prattrs");
+ attribs = createPQExpBuffer();
+ for (int k = 0; k < nattnames; k++)
+ {
+ if (k > 0)
+ appendPQExpBufferStr(attribs, ", ");
+
+ appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+ }
+ pubrinfo[i].pubrattrs = attribs->data;
+ }
+ else
+ pubrinfo[j].pubrattrs = NULL;
+
/* Decide whether we want to dump it */
selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
@@ -4391,10 +4420,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
query = createPQExpBuffer();
- appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+ appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
fmtId(pubinfo->dobj.name));
- appendPQExpBuffer(query, " %s;\n",
- fmtQualifiedDumpable(tbinfo));
+ appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+ if (pubrinfo->pubrattrs)
+ appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+ appendPQExpBufferStr(query, ";\n");
/*
* There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 6dccb4be4e..50a5b885f6 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -633,6 +633,7 @@ typedef struct _PublicationRelInfo
DumpableObject dobj;
PublicationInfo *publication;
TableInfo *pubtable;
+ char *pubrattrs;
} PublicationRelInfo;
/*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 72d8547628..46fa616406 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6302,7 +6302,7 @@ listPublications(const char *pattern)
*/
static bool
addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
- bool singlecol, printTableContent *cont)
+ bool as_schema, printTableContent *cont)
{
PGresult *res;
int count = 0;
@@ -6319,10 +6319,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
for (i = 0; i < count; i++)
{
- if (!singlecol)
+ if (!as_schema) /* as table */
+ {
printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1));
- else
+ if (!PQgetisnull(res, i, 2))
+ appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+ }
+ else /* as schema */
printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
printTableAddFooter(cont, buf->data);
@@ -6450,8 +6454,20 @@ describePublications(const char *pattern)
{
/* Get the tables for the specified publication */
printfPQExpBuffer(&buf,
- "SELECT n.nspname, c.relname\n"
- "FROM pg_catalog.pg_class c,\n"
+ "SELECT n.nspname, c.relname, \n");
+ if (pset.sversion >= 150000)
+ appendPQExpBufferStr(&buf,
+ " CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+ " pg_catalog.array_to_string"
+ "(ARRAY(SELECT attname\n"
+ " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
+ " pg_catalog.pg_attribute\n"
+ " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+ " ELSE NULL END AS columns");
+ else
+ appendPQExpBufferStr(&buf, "NULL as columns");
+ appendPQExpBuffer(&buf,
+ "\nFROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
" pg_catalog.pg_publication_rel pr\n"
"WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca3db..84ee807e0b 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
/* ALTER PUBLICATION <name> ADD */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+ COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
/* ALTER PUBLICATION <name> DROP */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index 3eca295ff4..76d421e09e 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -214,6 +214,9 @@ extern long changeDependenciesOf(Oid classId, Oid oldObjectId,
extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId,
Oid newRefObjectId);
+extern void findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+ Oid refclassId, Oid refobjectId, int32 refobjsubId);
+
extern Oid getExtensionOfObject(Oid classId, Oid objectId);
extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId);
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..f5ae2065e9 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
typedef struct PublicationRelInfo
{
Relation relation;
+ List *columns;
} PublicationRelInfo;
extern Publication *GetPublication(Oid pubid);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..7ad285faae 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
Oid oid; /* oid */
Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */
+#ifdef CATALOG_VARLEN
+ int2vector prattrs; /* Variable length field starts here */
+#endif
} FormData_pg_publication_rel;
/* ----------------
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 4ba68c70ee..23f037df7f 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -25,7 +25,7 @@
extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
extern void RemovePublicationById(Oid pubid);
-extern void RemovePublicationRelById(Oid proid);
+extern void RemovePublicationRelById(Oid proid, int32 attnum);
extern void RemovePublicationSchemaById(Oid psoid);
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..02b547d044 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
{
NodeTag type;
RangeVar *relation; /* relation to be published */
+ List *columns; /* List of columns in a publication table */
} PublicationTable;
/*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..709b4be916 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
Relation rel, HeapTuple newtuple,
- bool binary);
+ bool binary, Bitmapset *att_map);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
- HeapTuple newtuple, bool binary);
+ HeapTuple newtuple, bool binary, Bitmapset *att_map);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
- Relation rel);
+ Relation rel, Bitmapset *att_map);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5ac2d666a2..84afe0ebef 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,35 @@ Publications:
regress_publication_user | t | t | t | f | f | f
(1 row)
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z); -- error
+ERROR: column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error
+ERROR: column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error
+ERROR: invalid column list for publishing relation "testpub_tbl5"
+DETAIL: All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+ Publication testpub_fortable
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | t | f
+Tables:
+ "public.testpub_tbl5" (a)
+Tables from schemas:
+ "pub_test"
+
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+ERROR: cannot drop the last column in publication "testpub_fortable"
+HINT: Remove table "testpub_tbl5" from the publication first.
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error
+ERROR: invalid column list for publishing relation "testpub_tbl6"
+DETAIL: Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
CREATE TABLE testpub_tbl3 (a int);
CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -669,6 +697,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
Tables from schemas:
"pub_test1"
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR: syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR: column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+ ^
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..200158ba69 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,20 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
\d+ testpub_tbl2
\dRp+ testpub_foralltables
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z); -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +375,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
\dRp+ testpub1_forschema
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
new file mode 100644
index 0000000000..354e6ac363
--- /dev/null
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -0,0 +1,162 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+ "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column c is not replicated');
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1,'abc',3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab2");
+is($result, qq(1|abc), 'insert on column c is not replicated');
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab2 SET c = 5 where a = 1");
+is($result, qq(1|abc), 'update on column c is not replicated');
+
+# Test behavior when a column is dropped
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE test_part DROP COLUMN b");
+$result = $node_publisher->safe_psql('postgres',
+ "select prrelid::regclass, prattrs from pg_publication_rel pb;");
+is($result,
+ q(tab1|1 2
+tab3|1 3
+tab2|1 2
+test_part|1), 'column test_part.b removed');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_part VALUES (3, '2021-12-13 12:13:14')");
+$node_publisher->wait_for_catchup('sub1');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM test_part WHERE a = 3");
+is($result, "3|", 'only column a is replicated');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab4 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab4 (a, d)");
+$node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab4;"),
+ qq(1|11|1111
+2|22|2222),
+ 'overlapping publications with overlapping column lists');