On 4/3/18 22:31, Peter Eisentraut wrote: > The problem I see now is that when we WAL-log a truncate, we include all > the relids in one record. That seems useful. But during decoding we > split this into one change per relid. So at the receiving end, truncate > can only process one relation at a time, which will fail if there are > foreign keys involved. The solution that had been proposed here was to > ignore foreign keys on the subscriber, which is clearly problematic.
> I'm going to try to hack up an alternative approach and see how it works > out. Done here. I added a separate callback for decoding truncates, which receives all the relations at once. That information can then be shipped off together and applied together on the receiving side. So foreign keys are not a problem anymore. This ended up being a bit larger than the original patch, but I think it's sound behavior and future-proof. -- Peter Eisentraut http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From df101e32c358ac9243285d4e8f125803988e5508 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Thu, 5 Apr 2018 11:46:41 -0400 Subject: [PATCH v19 1/2] Logical decoding of TRUNCATE --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/truncate.out | 25 ++++++ contrib/test_decoding/sql/truncate.sql | 10 +++ contrib/test_decoding/test_decoding.c | 61 +++++++++++++ doc/src/sgml/logicaldecoding.sgml | 27 +++++- src/backend/access/heap/heapam.c | 7 ++ src/backend/access/rmgrdesc/heapdesc.c | 14 +++ src/backend/commands/tablecmds.c | 87 +++++++++++++++++-- src/backend/replication/logical/decode.c | 41 +++++++++ src/backend/replication/logical/logical.c | 43 +++++++++ .../replication/logical/reorderbuffer.c | 35 ++++++++ src/include/access/heapam_xlog.h | 23 ++++- src/include/commands/tablecmds.h | 2 + src/include/replication/output_plugin.h | 10 +++ src/include/replication/reorderbuffer.h | 24 ++++- 15 files changed, 398 insertions(+), 13 deletions(-) create mode 100644 contrib/test_decoding/expected/truncate.out create mode 100644 contrib/test_decoding/sql/truncate.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 6c18189d9d..1d601d8144 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -39,7 +39,7 @@ submake-test_decoding: REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot + spill slot truncate regresscheck: | submake-regress submake-test_decoding temp-install $(pg_regress_check) \ diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out new file mode 100644 index 0000000000..be85178206 --- /dev/null +++ b/contrib/test_decoding/expected/truncate.out @@ -0,0 +1,25 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE tab1 (id serial unique, data int); +CREATE TABLE tab2 (a int primary key, b int); +TRUNCATE tab1; +TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; +TRUNCATE tab1, tab2; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------ + BEGIN + table public.tab1: TRUNCATE: (no-flags) + COMMIT + BEGIN + table public.tab1: TRUNCATE: restart_seqs cascade + COMMIT + BEGIN + table public.tab1, public.tab2: TRUNCATE: (no-flags) + COMMIT +(9 rows) + diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql new file mode 100644 index 0000000000..88f113fd5b --- /dev/null +++ b/contrib/test_decoding/sql/truncate.sql @@ -0,0 +1,10 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE tab1 (id serial unique, data int); +CREATE TABLE tab2 (a int primary key, b int); + +TRUNCATE tab1; +TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; +TRUNCATE tab1, tab2; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index a94aeeae29..c238f12e66 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pg_decode_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pg_decode_message(LogicalDecodingContext *ctx, @@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pg_decode_startup; cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; + cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; @@ -480,6 +485,62 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + TestDecodingData *data; + MemoryContext old; + int i; + + data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_begin(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfoString(ctx->out, "table "); + + for (i = 0; i < nrelations; i++) + { + Oid relid = RelationGetRelid(relations[i]); + + if (i > 0) + appendStringInfoString(ctx->out, ", "); + + appendStringInfoString(ctx->out, + quote_qualified_identifier( + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid))); + } + + appendStringInfoString(ctx->out, ": TRUNCATE:"); + + if (change->data.truncate_msg.restart_seqs + || change->data.truncate_msg.cascade) + { + if (change->data.truncate_msg.restart_seqs) + appendStringInfo(ctx->out, " restart_seqs"); + if (change->data.truncate_msg.cascade) + appendStringInfo(ctx->out, " cascade"); + } + else + appendStringInfoString(ctx->out, " (no-flags)"); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} + static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index f6b14dccb0..b29cfe6fb4 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -383,6 +383,7 @@ <title>Initialization Function</title> LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; + LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; @@ -394,8 +395,10 @@ <title>Initialization Function</title> The <function>begin_cb</function>, <function>change_cb</function> and <function>commit_cb</function> callbacks are required, while <function>startup_cb</function>, - <function>filter_by_origin_cb</function> + <function>filter_by_origin_cb</function>, <function>truncate_cb</function>, and <function>shutdown_cb</function> are optional. + If <function>truncate_cb</function> is not set but a + <command>TRUNCATE</command> is to be decoded, the action will be ignored. </para> </sect2> @@ -590,6 +593,28 @@ <title>Change Callback</title> </note> </sect3> + <sect3 id="logicaldecoding-output-plugin-truncate"> + <title>Truncate Callback</title> + + <para> + The <function>truncate_cb</function> callback is called for a + <command>TRUNCATE</command> command. +<programlisting> +typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); +</programlisting> + The parameters are analogous to the <function>change_cb</function> + callback. However, because <command>TRUNCATE</command> actions on + tables connected by foreign keys need to be executed together, this + callback receives an array of relations instead of just a single one. + See the description of the <xref linkend="sql-truncate"/> statement for + details. + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-filter-origin"> <title>Origin Filter Callback</title> diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f96567f5d5..0bafb4fefc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record) case XLOG_HEAP_UPDATE: heap_xlog_update(record, false); break; + case XLOG_HEAP_TRUNCATE: + /* + * TRUNCATE is a no-op because the actions are already logged as + * SMGR WAL records. TRUNCATE WAL record only exists for logical + * decoding. + */ + break; case XLOG_HEAP_HOT_UPDATE: heap_xlog_update(record, true); break; diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index b00c071cb6..349feb6510 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -75,6 +75,17 @@ heap_desc(StringInfo buf, XLogReaderState *record) xlrec->new_offnum, xlrec->new_xmax); } + else if (info == XLOG_HEAP_TRUNCATE) + { + xl_heap_truncate *xlrec = (xl_heap_truncate *) rec; + + if (xlrec->flags & XLH_TRUNCATE_CASCADE) + appendStringInfo(buf, "cascade "); + if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) + appendStringInfo(buf, "restart_seqs "); + appendStringInfo(buf, "nrelids %u", xlrec->nrelids); + /* skip the list of relids */ + } else if (info == XLOG_HEAP_CONFIRM) { xl_heap_confirm *xlrec = (xl_heap_confirm *) rec; @@ -186,6 +197,9 @@ heap_identify(uint8 info) case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE: id = "HOT_UPDATE+INIT"; break; + case XLOG_HEAP_TRUNCATE: + id = "TRUNCATE"; + break; case XLOG_HEAP_CONFIRM: id = "HEAP_CONFIRM"; break; diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index ec2f9616ed..077b954f36 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -16,6 +16,7 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/heapam_xlog.h" #include "access/multixact.h" #include "access/reloptions.h" #include "access/relscan.h" @@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt) { List *rels = NIL; List *relids = NIL; - List *seq_relids = NIL; - EState *estate; - ResultRelInfo *resultRelInfos; - ResultRelInfo *resultRelInfo; - SubTransactionId mySubid; + List *relids_logged = NIL; ListCell *cell; /* @@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt) truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(rel)) + relids_logged = lappend_oid(relids_logged, myrelid); if (recurse) { @@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt) truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(rel)) + relids_logged = lappend_oid(relids_logged, childrelid); } } else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) @@ -1379,7 +1382,34 @@ ExecuteTruncate(TruncateStmt *stmt) errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly."))); } + ExecuteTruncateGuts(rels, relids, relids_logged, + stmt->behavior, stmt->restart_seqs); + + /* And close the rels */ + foreach(cell, rels) + { + Relation rel = (Relation) lfirst(cell); + + heap_close(rel, NoLock); + } +} + +void +ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, + DropBehavior behavior, bool restart_seqs) +{ + List *rels; + List *seq_relids = NIL; + EState *estate; + ResultRelInfo *resultRelInfos; + ResultRelInfo *resultRelInfo; + SubTransactionId mySubid; + ListCell *cell; + Oid *logrelids; + /* + * Open, exclusive-lock, and check all the explicitly-specified relations + * * In CASCADE mode, suck in all referencing relations as well. This * requires multiple iterations to find indirectly-dependent relations. At * each phase, we need to exclusive-lock new rels before looking for their @@ -1387,7 +1417,8 @@ ExecuteTruncate(TruncateStmt *stmt) * soon as we open it, to avoid a faux pas such as holding lock for a long * time on a rel we have no permissions for. */ - if (stmt->behavior == DROP_CASCADE) + rels = list_copy(explicit_rels); + if (behavior == DROP_CASCADE) { for (;;) { @@ -1409,6 +1440,9 @@ ExecuteTruncate(TruncateStmt *stmt) truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, relid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(rel)) + relids_logged = lappend_oid(relids_logged, relid); } } } @@ -1421,7 +1455,7 @@ ExecuteTruncate(TruncateStmt *stmt) #ifdef USE_ASSERT_CHECKING heap_truncate_check_FKs(rels, false); #else - if (stmt->behavior == DROP_RESTRICT) + if (behavior == DROP_RESTRICT) heap_truncate_check_FKs(rels, false); #endif @@ -1431,7 +1465,7 @@ ExecuteTruncate(TruncateStmt *stmt) * We want to do this early since it's pointless to do all the truncation * work only to fail on sequence permissions. */ - if (stmt->restart_seqs) + if (restart_seqs) { foreach(cell, rels) { @@ -1586,6 +1620,38 @@ ExecuteTruncate(TruncateStmt *stmt) ResetSequence(seq_relid); } + /* + * Write a WAL record to allow this set of actions to be logically decoded. + * + * Assemble an array of relids so we can write a single WAL record for the + * whole action. + */ + if (list_length(relids_logged) > 0) + { + xl_heap_truncate xlrec; + int i = 0; + + logrelids = palloc(list_length(relids_logged) * sizeof(Oid)); + foreach (cell, relids_logged) + logrelids[i++] = lfirst_oid(cell); + + xlrec.dbId = MyDatabaseId; + xlrec.nrelids = list_length(relids_logged); + xlrec.flags = 0; + if (behavior == DROP_CASCADE) + xlrec.flags |= XLH_TRUNCATE_CASCADE; + if (restart_seqs) + xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate); + XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid)); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + (void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE); + } + /* * Process all AFTER STATEMENT TRUNCATE triggers. */ @@ -1603,7 +1669,10 @@ ExecuteTruncate(TruncateStmt *stmt) /* We can clean up the EState now */ FreeExecutorState(estate); - /* And close the rels (can't do this while EState still holds refs) */ + /* And close the eventual rels opened by CASCADE + * (can't do this while EState still holds refs) + */ + rels = list_difference_ptr(rels, explicit_rels); foreach(cell, rels) { Relation rel = (Relation) lfirst(cell); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6eb0d5527e..c94ac29d83 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -449,6 +450,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeDelete(ctx, buf); break; + case XLOG_HEAP_TRUNCATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeTruncate(ctx, buf); + break; + case XLOG_HEAP_INPLACE: /* @@ -825,6 +831,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } +/* + * Parse XLOG_HEAP_TRUNCATE from wal + */ +static void +DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_truncate *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_heap_truncate *) XLogRecGetData(r); + + /* only interested in our database */ + if (xlrec->dbId != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_TRUNCATE; + change->origin_id = XLogRecGetOrigin(r); + if (xlrec->flags & XLH_TRUNCATE_CASCADE) + change->data.truncate_msg.cascade = true; + if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) + change->data.truncate_msg.restart_seqs = true; + change->data.truncate_msg.nrelids = xlrec->nrelids; + change->data.truncate_msg.relids = palloc(xlrec->nrelids * sizeof(Oid)); + memcpy(change->data.truncate_msg.relids, xlrec->relids, + xlrec->nrelids * sizeof(Oid)); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), + buf->origptr, change); +} + /* * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. * diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3d8ad7ddf8..0737c7b1e7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); +static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); @@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options, /* wrap output plugin callbacks, so we can add error context information */ ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; + ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; @@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (!ctx->callbacks.truncate_cb) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "truncate"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b4016ed52b..7691c1a8b4 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_TRUNCATE: break; } @@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + int i; + int nrelids = change->data.truncate_msg.nrelids; + int nrelations = 0; + Relation *relations; + + relations = palloc0(nrelids * sizeof(Relation)); + for (i = 0; i < nrelids; i++) + { + Oid relid = change->data.truncate_msg.relids[i]; + Relation relation; + + relation = RelationIdGetRelation(relid); + + if (relation == NULL) + elog(ERROR, "could not open relation with OID %u", relid); + + if (!RelationIsLogicallyLogged(relation)) + continue; + + relations[nrelations++] = relation; + } + + rb->apply_truncate(rb, txn, nrelations, relations, change); + + for (i = 0; i < nrelations; i++) + RelationClose(relations[i]); + + break; + } + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, @@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_TRUNCATE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: @@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } /* the base struct contains all the data, easy peasy */ + case REORDER_BUFFER_CHANGE_TRUNCATE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 700e25c36a..0052e4c569 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -32,7 +32,7 @@ #define XLOG_HEAP_INSERT 0x00 #define XLOG_HEAP_DELETE 0x10 #define XLOG_HEAP_UPDATE 0x20 -/* 0x030 is free, was XLOG_HEAP_MOVE */ +#define XLOG_HEAP_TRUNCATE 0x30 #define XLOG_HEAP_HOT_UPDATE 0x40 #define XLOG_HEAP_CONFIRM 0x50 #define XLOG_HEAP_LOCK 0x60 @@ -109,6 +109,27 @@ typedef struct xl_heap_delete #define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8)) +/* + * xl_heap_delete flag values, 8 bits are available. + */ +#define XLH_TRUNCATE_CASCADE (1<<0) +#define XLH_TRUNCATE_RESTART_SEQS (1<<1) + +/* + * For truncate we list all truncated relids in an array, followed by all + * sequence relids that need to be restarted, if any. + * All rels are always within the same database, so we just list dbid once. + */ +typedef struct xl_heap_truncate +{ + Oid dbId; + uint32 nrelids; + uint8 flags; + Oid relids[FLEXIBLE_ARRAY_MEMBER]; +} xl_heap_truncate; + +#define SizeOfHeapTruncate (offsetof(xl_heap_truncate, relids)) + /* * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted * or updated tuple in WAL; we can save a few bytes by reconstructing the diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 04a961d383..70ee3da76b 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -54,6 +54,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, extern void CheckTableNotInUse(Relation rel, const char *stmt); extern void ExecuteTruncate(TruncateStmt *stmt); +extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, + DropBehavior behavior, bool restart_seqs); extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 82875d6b3d..1ee0a56f03 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -61,6 +61,15 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change); +/* + * Callback for every TRUNCATE in a successful transaction. + */ +typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + /* * Called for every (explicit or implicit) COMMIT of a successful transaction. */ @@ -98,6 +107,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; + LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa430c843c..13a8798115 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -59,7 +59,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_TRUNCATE }; /* @@ -128,6 +129,18 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* + * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing + * one set of relations to be truncated. + */ + struct + { + Size nrelids; + bool cascade; + bool restart_seqs; + Oid *relids; + } truncate_msg; } data; /* @@ -283,6 +296,14 @@ typedef void (*ReorderBufferApplyChangeCB) ( Relation relation, ReorderBufferChange *change); +/* truncate callback signature */ +typedef void (*ReorderBufferApplyTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + /* begin callback signature */ typedef void (*ReorderBufferBeginCB) ( ReorderBuffer *rb, @@ -328,6 +349,7 @@ struct ReorderBuffer */ ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; + ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; base-commit: 0a64b45152b593c5eb95f2e88fbce7fbfe84ae7b -- 2.17.0
From 7ee846501f8532cc6b6ebc06b07b8555ae8a57f4 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Thu, 5 Apr 2018 11:47:06 -0400 Subject: [PATCH v19 2/2] Logical replication support for TRUNCATE --- doc/src/sgml/catalogs.sgml | 8 + doc/src/sgml/logical-replication.sgml | 13 +- doc/src/sgml/protocol.sgml | 56 +++++++ doc/src/sgml/ref/create_publication.sgml | 10 +- src/backend/catalog/pg_publication.c | 1 + src/backend/commands/publicationcmds.c | 20 ++- src/backend/replication/logical/proto.c | 55 +++++++ src/backend/replication/logical/worker.c | 68 +++++++++ src/backend/replication/pgoutput/pgoutput.c | 129 +++++++++++----- src/backend/utils/cache/relcache.c | 3 +- src/bin/pg_dump/pg_dump.c | 33 +++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 2 +- src/bin/psql/describe.c | 26 +++- src/include/catalog/pg_publication.h | 7 +- src/include/replication/logicalproto.h | 4 + src/test/regress/expected/publication.out | 84 +++++----- src/test/subscription/t/010_truncate.pl | 161 ++++++++++++++++++++ 18 files changed, 571 insertions(+), 110 deletions(-) create mode 100644 src/test/subscription/t/010_truncate.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d6a9d8c580..e8efa13e8d 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5518,6 +5518,14 @@ <title><structname>pg_publication</structname> Columns</title> <entry>If true, <command>DELETE</command> operations are replicated for tables in the publication.</entry> </row> + + <row> + <entry><structfield>pubtruncate</structfield></entry> + <entry><type>bool</type></entry> + <entry></entry> + <entry>If true, <command>TRUNCATE</command> operations are replicated for + tables in the publication.</entry> + </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 75551d8ee1..151e773fc2 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -108,8 +108,8 @@ <title>Publication</title> <para> Publications can choose to limit the changes they produce to - any combination of <command>INSERT</command>, <command>UPDATE</command>, and - <command>DELETE</command>, similar to how triggers are fired by + any combination of <command>INSERT</command>, <command>UPDATE</command>, + <command>DELETE</command>, and <command>TRUNCATE</command>, similar to how triggers are fired by particular event types. By default, all operation types are replicated. </para> @@ -364,15 +364,6 @@ <title>Restrictions</title> </para> </listitem> - <listitem> - <para> - <command>TRUNCATE</command> commands are not replicated. This can, of - course, be worked around by using <command>DELETE</command> instead. To - avoid accidental <command>TRUNCATE</command> invocations, you can revoke - the <literal>TRUNCATE</literal> privilege from tables. - </para> - </listitem> - <listitem> <para> Large objects (see <xref linkend="largeobjects"/>) are not replicated. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b94dd4ac65..004b36084f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6774,6 +6774,62 @@ <title>Logical Replication Message Formats</title> </listitem> </varlistentry> +<varlistentry> +<term> +Truncate +</term> +<listitem> +<para> + +<variablelist> +<varlistentry> +<term> + Byte1('T') +</term> +<listitem> +<para> + Identifies the message as a truncate message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int32 +</term> +<listitem> +<para> + Number of relations +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int8 +</term> +<listitem> +<para> + Option bits for <command>TRUNCATE</command>: + 1 for <literal>CASCADE</literal>, 2 for <literal>RESTART IDENTITY</literal> +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int32 +</term> +<listitem> +<para> + ID of the relation corresponding to the ID in the relation + message. This field is repeated for each relation. +</para> +</listitem> +</varlistentry> + +</variablelist> +</para> +</listitem> +</varlistentry> + </variablelist> <para> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index bfe12d5f41..99f87ca393 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -106,10 +106,11 @@ <title>Parameters</title> This parameter determines which DML operations will be published by the new publication to the subscribers. The value is comma-separated list of operations. The allowed operations are - <literal>insert</literal>, <literal>update</literal>, and - <literal>delete</literal>. The default is to publish all actions, + <literal>insert</literal>, <literal>update</literal>, + <literal>delete</literal>, and <literal>truncate</literal>. + The default is to publish all actions, and so the default value for this option is - <literal>'insert, update, delete'</literal>. + <literal>'insert, update, delete, truncate'</literal>. </para> </listitem> </varlistentry> @@ -168,8 +169,7 @@ <title>Notes</title> </para> <para> - <command>TRUNCATE</command> and <acronym>DDL</acronym> operations - are not published. + <acronym>DDL</acronym> operations are not published. </para> </refsect1> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ba18258ebb..ec3bd1d22d 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -376,6 +376,7 @@ GetPublication(Oid pubid) pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; + pub->pubactions.pubtruncate = pubform->pubtruncate; ReleaseSysCache(tup); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c5aa9ebc2..29992d4a0e 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -62,7 +62,8 @@ parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, - bool *publish_delete) + bool *publish_delete, + bool *publish_truncate) { ListCell *lc; @@ -72,6 +73,7 @@ parse_publication_options(List *options, *publish_insert = true; *publish_update = true; *publish_delete = true; + *publish_truncate = true; /* Parse options */ foreach(lc, options) @@ -96,6 +98,7 @@ parse_publication_options(List *options, *publish_insert = false; *publish_update = false; *publish_delete = false; + *publish_truncate = false; *publish_given = true; publish = defGetString(defel); @@ -116,6 +119,8 @@ parse_publication_options(List *options, *publish_update = true; else if (strcmp(publish_opt, "delete") == 0) *publish_delete = true; + else if (strcmp(publish_opt, "truncate") == 0) + *publish_truncate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt) bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; AclResult aclresult; /* must have CREATE privilege on database */ @@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt) parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); @@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt) BoolGetDatum(publish_update); values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); + values[Anum_pg_publication_pubtruncate - 1] = + BoolGetDatum(publish_truncate); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; ObjectAddress obj; parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); replaces[Anum_pg_publication_pubdelete - 1] = true; + + values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); + replaces[Anum_pg_publication_pubtruncate - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 948343e4ae..edc97a7662 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -26,6 +26,9 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define TRUNCATE_CASCADE (1<<0) +#define TRUNCATE_RESTART_SEQS (1<<1) + static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple); @@ -292,6 +295,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) return relid; } +/* + * Write TRUNCATE to the output stream. + */ +void +logicalrep_write_truncate(StringInfo out, + int nrelids, + Oid relids[], + bool cascade, bool restart_seqs) +{ + int i; + uint8 flags = 0; + + pq_sendbyte(out, 'T'); /* action TRUNCATE */ + + pq_sendint32(out, nrelids); + + /* encode and send truncate flags */ + if (cascade) + flags |= TRUNCATE_CASCADE; + if (restart_seqs) + flags |= TRUNCATE_RESTART_SEQS; + pq_sendint8(out, flags); + + for (i = 0; i < nrelids; i++) + pq_sendint32(out, relids[i]); +} + +/* + * Read TRUNCATE from stream. + */ +List * +logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs) +{ + int i; + int nrelids; + List *relids = NIL; + uint8 flags; + + nrelids = pq_getmsgint(in, 4); + + /* read and decode truncate flags */ + flags = pq_getmsgint(in, 1); + *cascade = (flags & TRUNCATE_CASCADE) > 0; + *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; + + for (i = 0; i < nrelids; i++) + relids = lappend_oid(relids, pq_getmsgint(in, 4)); + + return relids; +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fdace7eea2..18766364f7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -30,10 +30,12 @@ #include "access/xact.h" #include "access/xlog_internal.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -83,6 +85,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/timeout.h" #include "utils/tqual.h" #include "utils/syscache.h" @@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s) CommandCounterIncrement(); } +/* + * Handle TRUNCATE message. + * + * TODO: FDW support + */ +static void +apply_handle_truncate(StringInfo s) +{ + bool cascade = false; + bool restart_seqs = false; + List *remote_relids = NIL; + List *remote_rels = NIL; + List *rels = NIL; + List *relids = NIL; + List *relids_logged = NIL; + ListCell *lc; + + ensure_transaction(); + + remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); + + foreach(lc, remote_relids) + { + LogicalRepRelId relid = lfirst_oid(lc); + LogicalRepRelMapEntry *rel; + + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } + + remote_rels = lappend(remote_rels, rel); + rels = lappend(rels, rel->localrel); + relids = lappend_oid(relids, rel->localreloid); + if (RelationIsLogicallyLogged(rel->localrel)) + relids_logged = lappend_oid(relids, rel->localreloid); + } + + /* + * Even if we used CASCADE on the upstream master we explicitly + * default to replaying changes without further cascading. + * This might be later changeable with a user specified option. + */ + ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); + + foreach(lc, remote_rels) + { + LogicalRepRelMapEntry *rel = lfirst(lc); + + logicalrep_rel_close(rel, NoLock); + } + + CommandCounterIncrement(); +} + /* * Logical replication protocol message dispatcher. @@ -919,6 +983,10 @@ apply_dispatch(StringInfo s) case 'D': apply_handle_delete(s); break; + /* TRUNCATE */ + case 'T': + apply_handle_truncate(s); + break; /* RELATION */ case 'R': apply_handle_relation(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa9cf5b54e..e31ad7e027 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pgoutput_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pgoutput_startup; cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; + cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -250,6 +254,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* + * Write the relation schema if the current schema haven't been sent yet. + */ +static void +maybe_send_schema(LogicalDecodingContext *ctx, + Relation relation, RelationSyncEntry *relentry) +{ + if (!relentry->schema_sent) + { + TupleDesc desc; + int i; + + desc = RelationGetDescr(relation); + + /* + * Write out type info if needed. We do that only for user created + * types. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + if (att->atttypid < FirstNormalObjectId) + continue; + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_typ(ctx->out, att->atttypid); + OutputPluginWrite(ctx, false); + } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); + relentry->schema_sent = true; + } +} + /* * Sends the decoded DML over wire. */ @@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - /* - * Write the relation schema if the current schema haven't been sent yet. - */ - if (!relentry->schema_sent) - { - TupleDesc desc; - int i; - - desc = RelationGetDescr(relation); - - /* - * Write out type info if needed. We do that only for user created - * types. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->atttypid < FirstNormalObjectId) - continue; - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); - OutputPluginWrite(ctx, false); - relentry->schema_sent = true; - } + maybe_send_schema(ctx, relation, relentry); /* Send the data */ switch (change->action) @@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + int i; + int nrelids; + Oid *relids; + + old = MemoryContextSwitchTo(data->context); + + relids = palloc0(nrelations * sizeof(Oid)); + nrelids = 0; + + for (i = 0; i < nrelations; i++) + { + Relation relation = relations[i]; + Oid relid = RelationGetRelid(relation); + + relentry = get_rel_sync_entry(data, relid); + + if (!is_publishable_relation(relation)) + continue; + + if (!relentry->pubactions.pubtruncate) + continue; + + relids[nrelids++] = relid; + maybe_send_schema(ctx, relation, relentry); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, + nrelids, + relids, + change->data.truncate_msg.cascade, + change->data.truncate_msg.restart_seqs); + OutputPluginWrite(ctx, true); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + /* * Currently we always forward. */ @@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; foreach(lc, data->publications) { @@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 69a2114a10..6a67c185b0 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation) pubactions->pubinsert |= pubform->pubinsert; pubactions->pubupdate |= pubform->pubupdate; pubactions->pubdelete |= pubform->pubdelete; + pubactions->pubtruncate |= pubform->pubtruncate; ReleaseSysCache(tup); @@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation) * other publications. */ if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete) + pubactions->pubdelete && pubactions->pubtruncate) break; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index d066f4f00b..69016a6c4d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3712,6 +3712,7 @@ getPublications(Archive *fout) int i_pubinsert; int i_pubupdate; int i_pubdelete; + int i_pubtruncate; int i, ntups; @@ -3723,12 +3724,20 @@ getPublications(Archive *fout) resetPQExpBuffer(query); /* Get the publications. */ - appendPQExpBuffer(query, - "SELECT p.tableoid, p.oid, p.pubname, " - "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete " - "FROM pg_publication p", - username_subquery); + if (fout->remoteVersion >= 110000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate " + "FROM pg_publication p", + username_subquery); + else + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate " + "FROM pg_publication p", + username_subquery); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -3742,6 +3751,7 @@ getPublications(Archive *fout) i_pubinsert = PQfnumber(res, "pubinsert"); i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); + i_pubtruncate = PQfnumber(res, "pubtruncate"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3762,6 +3772,8 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0); pubinfo[i].pubdelete = (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); + pubinfo[i].pubtruncate = + (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); if (strlen(pubinfo[i].rolname) == 0) write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n", @@ -3829,6 +3841,15 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) first = false; } + if (pubinfo->pubtruncate) + { + if (!first) + appendPQExpBufferStr(query, ", "); + + appendPQExpBufferStr(query, "truncate"); + first = false; + } + appendPQExpBufferStr(query, "');\n"); ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a4d6d926a8..c2314758de 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -595,6 +595,7 @@ typedef struct _PublicationInfo bool pubinsert; bool pubupdate; bool pubdelete; + bool pubtruncate; } PublicationInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index acdfde2a1e..25852b903c 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2038,7 +2038,7 @@ create_order => 50, create_sql => 'CREATE PUBLICATION pub1;', regexp => qr/^ - \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete');\E + \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E /xm, like => { %full_runs, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 0c3be1f504..75a1e42cee 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5187,7 +5187,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5207,13 +5207,17 @@ listPublications(const char *pattern) " puballtables AS \"%s\",\n" " pubinsert AS \"%s\",\n" " pubupdate AS \"%s\",\n" - " pubdelete AS \"%s\"\n", + " pubdelete AS \"%s\"", gettext_noop("Name"), gettext_noop("Owner"), gettext_noop("All tables"), gettext_noop("Inserts"), gettext_noop("Updates"), gettext_noop("Deletes")); + if (pset.sversion >= 110000) + appendPQExpBuffer(&buf, + ",\n pubtruncate AS \"%s\"", + gettext_noop("Truncates")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5254,6 +5258,7 @@ describePublications(const char *pattern) PQExpBufferData buf; int i; PGresult *res; + bool has_pubtruncate; if (pset.sversion < 100000) { @@ -5265,13 +5270,19 @@ describePublications(const char *pattern) return true; } + has_pubtruncate = (pset.sversion >= 110000); + initPQExpBuffer(&buf); printfPQExpBuffer(&buf, "SELECT oid, pubname,\n" " pg_catalog.pg_get_userbyid(pubowner) AS owner,\n" - " puballtables, pubinsert, pubupdate, pubdelete\n" - "FROM pg_catalog.pg_publication\n"); + " puballtables, pubinsert, pubupdate, pubdelete"); + if (has_pubtruncate) + appendPQExpBuffer(&buf, + ", pubtruncate"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_publication\n"); processSQLNamePattern(pset.db, &buf, pattern, false, false, NULL, "pubname", NULL, @@ -5317,6 +5328,9 @@ describePublications(const char *pattern) printTableOpt myopt = pset.popt.topt; printTableContent cont; + if (has_pubtruncate) + ncols++; + initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); printTableInit(&cont, &myopt, title.data, ncols, nrows); @@ -5326,12 +5340,16 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Inserts"), true, align); printTableAddHeader(&cont, gettext_noop("Updates"), true, align); printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); + if (has_pubtruncate) + printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); + if (has_pubtruncate) + printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (!puballtables) { diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 37e77b8be7..b643c489cd 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -49,6 +49,9 @@ CATALOG(pg_publication,6104) /* true if deletes are published */ bool pubdelete; + /* true if truncates are published */ + bool pubtruncate; + } FormData_pg_publication; /* ---------------- @@ -63,19 +66,21 @@ typedef FormData_pg_publication *Form_pg_publication; * ---------------- */ -#define Natts_pg_publication 6 +#define Natts_pg_publication 7 #define Anum_pg_publication_pubname 1 #define Anum_pg_publication_pubowner 2 #define Anum_pg_publication_puballtables 3 #define Anum_pg_publication_pubinsert 4 #define Anum_pg_publication_pubupdate 5 #define Anum_pg_publication_pubdelete 6 +#define Anum_pg_publication_pubtruncate 7 typedef struct PublicationActions { bool pubinsert; bool pubupdate; bool pubdelete; + bool pubtruncate; } PublicationActions; typedef struct Publication diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 116f16f42d..92e88d3127 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -97,6 +97,10 @@ extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); +extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], + bool cascade, bool restart_seqs); +extern List *logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs); extern void logicalrep_write_rel(StringInfo out, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 0c86c647bc..afbbdd543d 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -21,20 +21,20 @@ ERROR: unrecognized publication parameter: foo CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); ERROR: unrecognized "publish" value: "cluster" \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ---------------------+--------------------------+------------+---------+---------+--------- - testpib_ins_trunct | regress_publication_user | f | t | f | f - testpub_default | regress_publication_user | f | f | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------+--------------------------+------------+---------+---------+---------+----------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ---------------------+--------------------------+------------+---------+---------+--------- - testpib_ins_trunct | regress_publication_user | f | t | f | f - testpub_default | regress_publication_user | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------+--------------------------+------------+---------+---------+---------+----------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f (2 rows) --- adding tables @@ -76,10 +76,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | t | t | t | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | t | t | t | f | f (1 row) DROP TABLE testpub_tbl2; @@ -89,19 +89,19 @@ CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "public.testpub_tbl3" @@ -119,10 +119,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -165,10 +165,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -210,10 +210,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | f (1 row) -- fail - must be owner of publication @@ -223,20 +223,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes --------------+--------------------------+------------+---------+---------+--------- - testpub_foo | regress_publication_user | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +-------------+--------------------------+------------+---------+---------+---------+----------- + testpub_foo | regress_publication_user | f | t | t | t | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ------------------+---------------------------+------------+---------+---------+--------- - testpub_default | regress_publication_user2 | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +-----------------+---------------------------+------------+---------+---------+---------+----------- + testpub_default | regress_publication_user2 | f | t | t | t | f (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl new file mode 100644 index 0000000000..8ea4ab624f --- /dev/null +++ b/src/test/subscription/t/010_truncate.pl @@ -0,0 +1,161 @@ +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 9; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE SEQUENCE seq1 OWNED BY tab1.a" +); +$node_subscriber->safe_psql('postgres', + "ALTER SEQUENCE seq1 START 101" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr application_name=sub1' PUBLICATION pub1"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr application_name=sub2' PUBLICATION pub2"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr application_name=sub3' PUBLICATION pub3"); + +$node_publisher->wait_for_catchup('sub1'); + +# insert data to truncate + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub1'); + +# truncate and check + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), + 'truncate replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT nextval('seq1')"); +is($result, qq(1), + 'sequence not restarted'); + +# truncate with restart identity + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT nextval('seq1')"); +is($result, qq(101), + 'truncate restarted identities'); + +# test publication that does not replicate truncate + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), + 'truncate not replicated'); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(0||), + 'truncate replicated after publication change'); + +# test multiple tables connected by foreign keys + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4"); + +$node_publisher->wait_for_catchup('sub3'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab3"); +is($result, qq(0||), + 'truncate of multiple tables replicated'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(x), max(x) FROM tab4"); +is($result, qq(0||), + 'truncate of multiple tables replicated'); + +# test truncate of multiple tables, some of which are not published + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2"); + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), + 'truncate of multiple tables some not published'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), + 'truncate of multiple tables some not published'); -- 2.17.0