On Tue, 29 Oct 2024 at 11:19, Hayato Kuroda (Fujitsu) <kuroda.hay...@fujitsu.com> wrote: > > Dear Shubham, > > Thanks for updating the patch! Here are my comments for v44. > > 01. fetch_remote_table_info() > > `bool *remotegencolpresent` is accessed unconditionally, but it can cause > crash > if NULL is passed to the function. Should we add an Assert to verify it?
I have not made any changes for this as I felt it is not required. Also Amit felt the same way as in [1]. > 02. fetch_remote_table_info() > > ``` > + if (server_version >= 180000) > + *remotegencolpresent |= DatumGetBool(slot_getattr(slot, 5, > &isnull)); > + > ``` > > Can we add Assert(!isnull) like other parts? Included it. > 03. fetch_remote_table_info() > > Also, we do not have to reach here once *remotegencolpresent becomes true. > Based on 02 and 03, how about below? > > ``` > if (server_version >= 180000 && !(*remotegencolpresent)) > { > *remotegencolpresent |= > DatumGetBool(slot_getattr(slot, 5, &isnull)); > Assert(!isnull); > } > ``` Modified > 04. pgoutput_column_list_init() > > + if (att->attgenerated) > + { > + if (bms_is_member(att->attnum, cols)) > + gencolpresent = true; > + > continue; > + } > > I'm not sure it is correct. Why do you skip the generated column even when it > is in > the column list? Also, can you add comments what you want to do? Modified it now and added comments. The v45 version attached has the changes for the same. [1] - https://www.postgresql.org/message-id/CAA4eK1Keq7hewXGe4mUHuCzEA5%3DZR5wQK0%2BL5TU%2BzVFUMrmOFw%40mail.gmail.com Regards, Vignesh
From 6bf7a7d777b39fed8be9fb1c28035e8effa530c2 Mon Sep 17 00:00:00 2001 From: Hou Zhijie <houzj.f...@cn.fujitsu.com> Date: Mon, 28 Oct 2024 14:53:27 +0800 Subject: [PATCH v45] Allow logical replication to publish generated columns when explicitly listed. This patch enables the replication of generated columns in the column list, which was previously disallowed. Users can now include generated columns in the publication's column list to replicate their data. Additionally, generated column data will be copied during initial table synchronization using the COPY command. Since the standard COPY command does not allow specifying generated columns, we utilize an alternative syntax if a generated column is specified in the column list: COPY (SELECT column_name FROM table_name) TO STDOUT. --- doc/src/sgml/protocol.sgml | 4 +- doc/src/sgml/ref/create_publication.sgml | 8 +-- src/backend/catalog/pg_publication.c | 10 +--- src/backend/replication/logical/proto.c | 65 ++++++++++++--------- src/backend/replication/logical/tablesync.c | 51 +++++++++++----- src/backend/replication/pgoutput/pgoutput.c | 28 ++++++--- src/include/replication/logicalproto.h | 2 + src/test/regress/expected/publication.out | 6 +- src/test/regress/sql/publication.sql | 6 +- src/test/subscription/t/031_column_list.pl | 41 ++++++++++++- 10 files changed, 144 insertions(+), 77 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 057c46f3f5..71b6b2a535 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6544,7 +6544,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" <para> Next, the following message part appears for each column included in - the publication (except generated columns): + the publication: </para> <variablelist> @@ -7477,7 +7477,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </variablelist> <para> - Next, one of the following submessages appears for each column (except generated columns): + Next, one of the following submessages appears for each column: <variablelist> <varlistentry> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index fd9c5deac9..835a59f8c5 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -88,10 +88,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> </para> <para> - When a column list is specified, only the named columns are replicated. - If no column list is specified, all columns of the table are replicated - through this publication, including any columns added later. It has no - effect on <literal>TRUNCATE</literal> commands. See + When a column list is specified, only the named columns are replicated. If + no column list is specified, all table columns (except generated columns) + are replicated through this publication, including any columns added later. + It has no effect on <literal>TRUNCATE</literal> commands. See <xref linkend="logical-replication-col-lists"/> for details about column lists. </para> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7e5e357fd9..17a6093d06 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -500,8 +500,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, * pub_collist_validate * Process and validate the 'columns' list and ensure the columns are all * valid to use for a publication. Checks for and raises an ERROR for - * any; unknown columns, system columns, duplicate columns or generated - * columns. + * any unknown columns, system columns, or duplicate columns. * * Looks up each column's attnum and returns a 0-based Bitmapset of the * corresponding attnums. @@ -511,7 +510,6 @@ pub_collist_validate(Relation targetrel, List *columns) { Bitmapset *set = NULL; ListCell *lc; - TupleDesc tupdesc = RelationGetDescr(targetrel); foreach(lc, columns) { @@ -530,12 +528,6 @@ pub_collist_validate(Relation targetrel, List *columns) errmsg("cannot use system column \"%s\" in publication column list", colname)); - if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) - ereport(ERROR, - errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("cannot use generated column \"%s\" in publication column list", - colname)); - if (bms_is_member(attnum, set)) ereport(ERROR, errcode(ERRCODE_DUPLICATE_OBJECT), diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 980f6e2741..cc643d2bd2 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -40,19 +40,6 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); -/* - * Check if a column is covered by a column list. - * - * Need to be careful about NULL, which is treated as a column list covering - * all columns. - */ -static bool -column_in_column_list(int attnum, Bitmapset *columns) -{ - return (columns == NULL || bms_is_member(attnum, columns)); -} - - /* * Write BEGIN to the output stream. */ @@ -781,10 +768,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; nliveatts++; @@ -802,10 +786,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, Form_pg_type typclass; Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; if (isnull[i]) @@ -938,10 +919,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; nliveatts++; @@ -959,10 +937,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) Form_pg_attribute att = TupleDescAttr(desc, i); uint8 flags = 0; - if (att->attisdropped || att->attgenerated) - continue; - - if (!column_in_column_list(att->attnum, columns)) + if (!logicalrep_should_publish_column(att, columns)) continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ @@ -1269,3 +1244,35 @@ logicalrep_message_type(LogicalRepMsgType action) return err_unknown; } + +/* + * Check if the column 'att' of a table should be published. + * + * 'columns' represents the column list specified for that table in the + * publication. Generated columns are allowed in the 'columns' list. + */ +bool +logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns) +{ + if (att->attisdropped) + return false; + + /* + * Skip publishing generated columns if they are not included in the + * column list. + */ + if (!columns && att->attgenerated) + return false; + + /* + * Check if a column is covered by a column list. + */ + if (columns && !bms_is_member(att->attnum, columns)) + return false; + + /* + * The column is either not a generated or dropped column, or it is + * present in the column list. + */ + return true; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d4b5d210e3..d1326ac312 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -791,19 +791,20 @@ copy_read_data(void *outbuf, int minread, int maxread) * qualifications to be used in the COPY command. */ static void -fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel, List **qual) +fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, + List **qual, bool *gencol_published) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; StringInfo pub_names = NULL; Bitmapset *included_cols = NULL; + int server_version = walrcv_server_version(LogRepWorkerWalRcvConn); lrel->nspname = nspname; lrel->relname = relname; @@ -851,7 +852,7 @@ fetch_remote_table_info(char *nspname, char *relname, * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ - if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + if (server_version >= 150000) { WalRcvExecResult *pubres; TupleTableSlot *tslot; @@ -941,20 +942,22 @@ fetch_remote_table_info(char *nspname, char *relname, "SELECT a.attnum," " a.attname," " a.atttypid," - " a.attnum = ANY(i.indkey)" + " a.attnum = ANY(i.indkey)"); + + /* Check if the column is generated. */ + if (server_version >= 180000) + appendStringInfo(&cmd, ", a.attgenerated != ''"); + + appendStringInfo(&cmd, " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" - " AND NOT a.attisdropped %s" + " AND NOT a.attisdropped" " AND a.attrelid = %u" - " ORDER BY a.attnum", - lrel->remoteid, - (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? - "AND a.attgenerated = ''" : ""), - lrel->remoteid); + " ORDER BY a.attnum", lrel->remoteid, lrel->remoteid); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, - lengthof(attrRow), attrRow); + server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -998,6 +1001,13 @@ fetch_remote_table_info(char *nspname, char *relname, if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); + /* Skip setting 'gencol_published' if it is already set. */ + if (server_version >= 180000 && !(*gencol_published)) + { + *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull)); + Assert(!isnull); + } + /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", @@ -1030,7 +1040,7 @@ fetch_remote_table_info(char *nspname, char *relname, * 3) one of the subscribed publications is declared as TABLES IN SCHEMA * that includes this relation */ - if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + if (server_version >= 150000) { /* Reuse the already-built pub_names. */ Assert(pub_names != NULL); @@ -1106,10 +1116,12 @@ copy_table(Relation rel) List *attnamelist; ParseState *pstate; List *options = NIL; + bool gencol_published = false; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel, &qual); + RelationGetRelationName(rel), &lrel, &qual, + &gencol_published); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -1121,8 +1133,11 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - /* Regular table with no row filter */ - if (lrel.relkind == RELKIND_RELATION && qual == NIL) + /* + * Regular table with no row filter and copy of generated columns is not + * necessary. + */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published) { appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); @@ -1156,6 +1171,10 @@ copy_table(Relation rel) * (SELECT ...), but we can't just do SELECT * because we need to not * copy generated columns. For tables with any row filters, build a * SELECT query with OR'ed row filters for COPY. + * + * We also need to use this same COPY (SELECT ...) syntax when + * generated columns are published, because copy of generated columns + * is not supported by the normal COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 00e7024563..12c1735906 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -766,16 +766,12 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (!logicalrep_should_publish_column(att, columns)) continue; if (att->atttypid < FirstGenbkiObjectId) continue; - /* Skip this attribute if it's not present in the column list */ - if (columns != NULL && !bms_is_member(att->attnum, columns)) - continue; - OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); @@ -1074,6 +1070,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, int i; int nliveatts = 0; TupleDesc desc = RelationGetDescr(relation); + bool att_gen_present = false; pgoutput_ensure_entry_cxt(data, entry); @@ -1085,17 +1082,30 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) continue; + if (att->attgenerated) + { + /* + * Generated cols are skipped unless they are + * present in a column list. + */ + if (!bms_is_member(att->attnum, cols)) + continue; + + att_gen_present = true; + } + nliveatts++; } /* - * If column list includes all the columns of the table, - * set it to NULL. + * Generated attributes are published only when they are + * present in the column list. Otherwise, a NULL column + * list means publish all columns. */ - if (bms_num_members(cols) == nliveatts) + if (!att_gen_present && bms_num_members(cols) == nliveatts) { bms_free(cols); cols = NULL; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index c409638a2e..b219f22655 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -270,5 +270,7 @@ extern void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info); extern const char *logicalrep_message_type(LogicalRepMsgType action); +extern bool logicalrep_should_publish_column(Form_pg_attribute att, + Bitmapset *columns); #endif /* LOGICAL_PROTO_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 660245ed0c..d2ed1efc3b 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -687,9 +687,6 @@ UPDATE testpub_tbl5 SET a = 1; ERROR: cannot update table "testpub_tbl5" DETAIL: Column list used by the publication does not cover the replica identity. ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; --- error: generated column "d" can't be in list -ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -ERROR: cannot use generated column "d" in publication column list -- error: system attributes "ctid" not allowed in column list ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); ERROR: cannot use system column "ctid" in publication column list @@ -717,6 +714,9 @@ UPDATE testpub_tbl5 SET a = 1; ERROR: cannot update table "testpub_tbl5" DETAIL: Column list used by the publication does not cover the replica identity. ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- ok: generated column "d" can be in the list too +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; -- error: change the replica identity to "b", and column list to (a, c) -- then update fails, because (a, c) does not cover replica identity ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index f68a5b5986..12aea71c0f 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -413,8 +413,6 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); UPDATE testpub_tbl5 SET a = 1; ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; --- error: generated column "d" can't be in list -ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -- error: system attributes "ctid" not allowed in column list ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid); @@ -435,6 +433,10 @@ ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; UPDATE testpub_tbl5 SET a = 1; ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- ok: generated column "d" can be in the list too +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; + -- error: change the replica identity to "b", and column list to (a, c) -- then update fails, because (a, c) does not cover replica identity ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 9a97fa5020..e54861b599 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -1202,9 +1202,10 @@ $result = $node_publisher->safe_psql( is( $result, qq(t t), 'check the number of columns in the old tuple'); -# TEST: Generated and dropped columns are not considered for the column list. -# So, the publication having a column list except for those columns and a -# publication without any column (aka all columns as part of the columns +# TEST: Dropped columns are not considered for the column list, and generated +# columns are not replicated if they are not explicitly included in the column +# list. So, the publication having a column list except for those columns and a +# publication without any column list (aka all columns as part of the columns # list) are considered to have the same column list. $node_publisher->safe_psql( 'postgres', qq( @@ -1275,6 +1276,40 @@ ok( $stderr =~ qr/cannot use different column lists for table "public.test_mix_1" in different publications/, 'different column lists detected'); +# TEST: Generated columns are considered for the column list. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE test_gen (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 1) STORED); + INSERT INTO test_gen VALUES (0); + CREATE PUBLICATION pub_gen FOR TABLE test_gen (a, b); +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE test_gen (a int PRIMARY KEY, b int); + CREATE SUBSCRIPTION sub_gen CONNECTION '$publisher_connstr' PUBLICATION pub_gen; +)); + +$node_subscriber->wait_for_subscription_sync; + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT * FROM test_gen ORDER BY a"), + qq(0|1), + 'initial replication with generated columns in column list'); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO test_gen VALUES (1); +)); + +$node_publisher->wait_for_catchup('sub_gen'); + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT * FROM test_gen ORDER BY a"), + qq(0|1 +1|2), + 'replication with generated columns in column list'); + # TEST: If the column list is changed after creating the subscription, we # should catch the error reported by walsender. -- 2.34.1