On 4/3/18 22:31, Peter Eisentraut wrote:
> The problem I see now is that when we WAL-log a truncate, we include all
> the relids in one record.  That seems useful.  But during decoding we
> split this into one change per relid.  So at the receiving end, truncate
> can only process one relation at a time, which will fail if there are
> foreign keys involved.  The solution that had been proposed here was to
> ignore foreign keys on the subscriber, which is clearly problematic.

> I'm going to try to hack up an alternative approach and see how it works
> out.

Done here.  I added a separate callback for decoding truncates, which
receives all the relations at once.  That information can then be
shipped off together and applied together on the receiving side.  So
foreign keys are not a problem anymore.  This ended up being a bit
larger than the original patch, but I think it's sound behavior and
future-proof.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From df101e32c358ac9243285d4e8f125803988e5508 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Thu, 5 Apr 2018 11:46:41 -0400
Subject: [PATCH v19 1/2] Logical decoding of TRUNCATE

---
 contrib/test_decoding/Makefile                |  2 +-
 contrib/test_decoding/expected/truncate.out   | 25 ++++++
 contrib/test_decoding/sql/truncate.sql        | 10 +++
 contrib/test_decoding/test_decoding.c         | 61 +++++++++++++
 doc/src/sgml/logicaldecoding.sgml             | 27 +++++-
 src/backend/access/heap/heapam.c              |  7 ++
 src/backend/access/rmgrdesc/heapdesc.c        | 14 +++
 src/backend/commands/tablecmds.c              | 87 +++++++++++++++++--
 src/backend/replication/logical/decode.c      | 41 +++++++++
 src/backend/replication/logical/logical.c     | 43 +++++++++
 .../replication/logical/reorderbuffer.c       | 35 ++++++++
 src/include/access/heapam_xlog.h              | 23 ++++-
 src/include/commands/tablecmds.h              |  2 +
 src/include/replication/output_plugin.h       | 10 +++
 src/include/replication/reorderbuffer.h       | 24 ++++-
 15 files changed, 398 insertions(+), 13 deletions(-)
 create mode 100644 contrib/test_decoding/expected/truncate.out
 create mode 100644 contrib/test_decoding/sql/truncate.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 6c18189d9d..1d601d8144 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -39,7 +39,7 @@ submake-test_decoding:
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
        decoding_into_rel binary prepared replorigin time messages \
-       spill slot
+       spill slot truncate
 
 regresscheck: | submake-regress submake-test_decoding temp-install
        $(pg_regress_check) \
diff --git a/contrib/test_decoding/expected/truncate.out 
b/contrib/test_decoding/expected/truncate.out
new file mode 100644
index 0000000000..be85178206
--- /dev/null
+++ b/contrib/test_decoding/expected/truncate.out
@@ -0,0 +1,25 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
+                         data                         
+------------------------------------------------------
+ BEGIN
+ table public.tab1: TRUNCATE: (no-flags)
+ COMMIT
+ BEGIN
+ table public.tab1: TRUNCATE: restart_seqs cascade
+ COMMIT
+ BEGIN
+ table public.tab1, public.tab2: TRUNCATE: (no-flags)
+ COMMIT
+(9 rows)
+
diff --git a/contrib/test_decoding/sql/truncate.sql 
b/contrib/test_decoding/sql/truncate.sql
new file mode 100644
index 0000000000..88f113fd5b
--- /dev/null
+++ b/contrib/test_decoding/sql/truncate.sql
@@ -0,0 +1,10 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
diff --git a/contrib/test_decoding/test_decoding.c 
b/contrib/test_decoding/test_decoding.c
index a94aeeae29..c238f12e66 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 static void pg_decode_change(LogicalDecodingContext *ctx,
                                 ReorderBufferTXN *txn, Relation rel,
                                 ReorderBufferChange *change);
+static void pg_decode_truncate(LogicalDecodingContext *ctx,
+                                                          ReorderBufferTXN 
*txn,
+                                                          int nrelations, 
Relation relations[],
+                                                          ReorderBufferChange 
*change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
                                 RepOriginId origin_id);
 static void pg_decode_message(LogicalDecodingContext *ctx,
@@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->startup_cb = pg_decode_startup;
        cb->begin_cb = pg_decode_begin_txn;
        cb->change_cb = pg_decode_change;
+       cb->truncate_cb = pg_decode_truncate;
        cb->commit_cb = pg_decode_commit_txn;
        cb->filter_by_origin_cb = pg_decode_filter;
        cb->shutdown_cb = pg_decode_shutdown;
@@ -480,6 +485,62 @@ pg_decode_change(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                  int nrelations, Relation relations[], 
ReorderBufferChange *change)
+{
+       TestDecodingData *data;
+       MemoryContext old;
+       int                     i;
+
+       data = ctx->output_plugin_private;
+
+       /* output BEGIN if we haven't yet */
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+       {
+               pg_output_begin(ctx, data, txn, false);
+       }
+       data->xact_wrote_changes = true;
+
+       /* Avoid leaking memory by using and resetting our own context */
+       old = MemoryContextSwitchTo(data->context);
+
+       OutputPluginPrepareWrite(ctx, true);
+
+       appendStringInfoString(ctx->out, "table ");
+
+       for (i = 0; i < nrelations; i++)
+       {
+               Oid                     relid = RelationGetRelid(relations[i]);
+
+               if (i > 0)
+                       appendStringInfoString(ctx->out, ", ");
+
+               appendStringInfoString(ctx->out,
+                                                          
quote_qualified_identifier(
+                                                                  
get_namespace_name(get_rel_namespace(relid)),
+                                                                  
get_rel_name(relid)));
+       }
+
+       appendStringInfoString(ctx->out, ": TRUNCATE:");
+
+       if (change->data.truncate_msg.restart_seqs
+               || change->data.truncate_msg.cascade)
+       {
+               if (change->data.truncate_msg.restart_seqs)
+                       appendStringInfo(ctx->out, " restart_seqs");
+               if (change->data.truncate_msg.cascade)
+                       appendStringInfo(ctx->out, " cascade");
+       }
+       else
+               appendStringInfoString(ctx->out, " (no-flags)");
+
+       MemoryContextSwitchTo(old);
+       MemoryContextReset(data->context);
+
+       OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_message(LogicalDecodingContext *ctx,
                                  ReorderBufferTXN *txn, XLogRecPtr lsn, bool 
transactional,
diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index f6b14dccb0..b29cfe6fb4 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -383,6 +383,7 @@ <title>Initialization Function</title>
     LogicalDecodeStartupCB startup_cb;
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
+    LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
@@ -394,8 +395,10 @@ <title>Initialization Function</title>
      The <function>begin_cb</function>, <function>change_cb</function>
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>,
-     <function>filter_by_origin_cb</function>
+     <function>filter_by_origin_cb</function>, 
<function>truncate_cb</function>,
      and <function>shutdown_cb</function> are optional.
+     If <function>truncate_cb</function> is not set but a
+     <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
    </sect2>
 
@@ -590,6 +593,28 @@ <title>Change Callback</title>
      </note>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-truncate">
+     <title>Truncate Callback</title>
+
+     <para>
+      The <function>truncate_cb</function> callback is called for a
+      <command>TRUNCATE</command> command.
+<programlisting>
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+                                         ReorderBufferTXN *txn,
+                                         int nrelations,
+                                         Relation relations[],
+                                         ReorderBufferChange *change);
+</programlisting>
+      The parameters are analogous to the <function>change_cb</function>
+      callback.  However, because <command>TRUNCATE</command> actions on
+      tables connected by foreign keys need to be executed together, this
+      callback receives an array of relations instead of just a single one.
+      See the description of the <xref linkend="sql-truncate"/> statement for
+      details.
+     </para>
+    </sect3>
+
      <sect3 id="logicaldecoding-output-plugin-filter-origin">
      <title>Origin Filter Callback</title>
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f96567f5d5..0bafb4fefc 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
                case XLOG_HEAP_UPDATE:
                        heap_xlog_update(record, false);
                        break;
+               case XLOG_HEAP_TRUNCATE:
+                       /*
+                        * TRUNCATE is a no-op because the actions are already 
logged as
+                        * SMGR WAL records.  TRUNCATE WAL record only exists 
for logical
+                        * decoding.
+                        */
+                       break;
                case XLOG_HEAP_HOT_UPDATE:
                        heap_xlog_update(record, true);
                        break;
diff --git a/src/backend/access/rmgrdesc/heapdesc.c 
b/src/backend/access/rmgrdesc/heapdesc.c
index b00c071cb6..349feb6510 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -75,6 +75,17 @@ heap_desc(StringInfo buf, XLogReaderState *record)
                                                 xlrec->new_offnum,
                                                 xlrec->new_xmax);
        }
+       else if (info == XLOG_HEAP_TRUNCATE)
+       {
+               xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
+
+               if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+                       appendStringInfo(buf, "cascade ");
+               if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+                       appendStringInfo(buf, "restart_seqs ");
+               appendStringInfo(buf, "nrelids %u", xlrec->nrelids);
+               /* skip the list of relids */
+       }
        else if (info == XLOG_HEAP_CONFIRM)
        {
                xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
@@ -186,6 +197,9 @@ heap_identify(uint8 info)
                case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
                        id = "HOT_UPDATE+INIT";
                        break;
+               case XLOG_HEAP_TRUNCATE:
+                       id = "TRUNCATE";
+                       break;
                case XLOG_HEAP_CONFIRM:
                        id = "HEAP_CONFIRM";
                        break;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index ec2f9616ed..077b954f36 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -16,6 +16,7 @@
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/heapam_xlog.h"
 #include "access/multixact.h"
 #include "access/reloptions.h"
 #include "access/relscan.h"
@@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
 {
        List       *rels = NIL;
        List       *relids = NIL;
-       List       *seq_relids = NIL;
-       EState     *estate;
-       ResultRelInfo *resultRelInfos;
-       ResultRelInfo *resultRelInfo;
-       SubTransactionId mySubid;
+       List       *relids_logged = NIL;
        ListCell   *cell;
 
        /*
@@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
                truncate_check_rel(rel);
                rels = lappend(rels, rel);
                relids = lappend_oid(relids, myrelid);
+               /* Log this relation only if needed for logical decoding */
+               if (RelationIsLogicallyLogged(rel))
+                       relids_logged = lappend_oid(relids_logged, myrelid);
 
                if (recurse)
                {
@@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
                                truncate_check_rel(rel);
                                rels = lappend(rels, rel);
                                relids = lappend_oid(relids, childrelid);
+                               /* Log this relation only if needed for logical 
decoding */
+                               if (RelationIsLogicallyLogged(rel))
+                                       relids_logged = 
lappend_oid(relids_logged, childrelid);
                        }
                }
                else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -1379,7 +1382,34 @@ ExecuteTruncate(TruncateStmt *stmt)
                                         errhint("Do not specify the ONLY 
keyword, or use TRUNCATE ONLY on the partitions directly.")));
        }
 
+       ExecuteTruncateGuts(rels, relids, relids_logged,
+                                               stmt->behavior, 
stmt->restart_seqs);
+
+       /* And close the rels */
+       foreach(cell, rels)
+       {
+               Relation        rel = (Relation) lfirst(cell);
+
+               heap_close(rel, NoLock);
+       }
+}
+
+void
+ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
+                                                       DropBehavior behavior, 
bool restart_seqs)
+{
+       List       *rels;
+       List       *seq_relids = NIL;
+       EState     *estate;
+       ResultRelInfo *resultRelInfos;
+       ResultRelInfo *resultRelInfo;
+       SubTransactionId mySubid;
+       ListCell   *cell;
+       Oid                *logrelids;
+
        /*
+        * Open, exclusive-lock, and check all the explicitly-specified 
relations
+        *
         * In CASCADE mode, suck in all referencing relations as well.  This
         * requires multiple iterations to find indirectly-dependent relations. 
At
         * each phase, we need to exclusive-lock new rels before looking for 
their
@@ -1387,7 +1417,8 @@ ExecuteTruncate(TruncateStmt *stmt)
         * soon as we open it, to avoid a faux pas such as holding lock for a 
long
         * time on a rel we have no permissions for.
         */
-       if (stmt->behavior == DROP_CASCADE)
+       rels = list_copy(explicit_rels);
+       if (behavior == DROP_CASCADE)
        {
                for (;;)
                {
@@ -1409,6 +1440,9 @@ ExecuteTruncate(TruncateStmt *stmt)
                                truncate_check_rel(rel);
                                rels = lappend(rels, rel);
                                relids = lappend_oid(relids, relid);
+                               /* Log this relation only if needed for logical 
decoding */
+                               if (RelationIsLogicallyLogged(rel))
+                                       relids_logged = 
lappend_oid(relids_logged, relid);
                        }
                }
        }
@@ -1421,7 +1455,7 @@ ExecuteTruncate(TruncateStmt *stmt)
 #ifdef USE_ASSERT_CHECKING
        heap_truncate_check_FKs(rels, false);
 #else
-       if (stmt->behavior == DROP_RESTRICT)
+       if (behavior == DROP_RESTRICT)
                heap_truncate_check_FKs(rels, false);
 #endif
 
@@ -1431,7 +1465,7 @@ ExecuteTruncate(TruncateStmt *stmt)
         * We want to do this early since it's pointless to do all the 
truncation
         * work only to fail on sequence permissions.
         */
-       if (stmt->restart_seqs)
+       if (restart_seqs)
        {
                foreach(cell, rels)
                {
@@ -1586,6 +1620,38 @@ ExecuteTruncate(TruncateStmt *stmt)
                ResetSequence(seq_relid);
        }
 
+       /*
+        * Write a WAL record to allow this set of actions to be logically 
decoded.
+        *
+        * Assemble an array of relids so we can write a single WAL record for 
the
+        * whole action.
+        */
+       if (list_length(relids_logged) > 0)
+       {
+               xl_heap_truncate xlrec;
+               int                     i = 0;
+
+               logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
+               foreach (cell, relids_logged)
+                       logrelids[i++] = lfirst_oid(cell);
+
+               xlrec.dbId = MyDatabaseId;
+               xlrec.nrelids = list_length(relids_logged);
+               xlrec.flags = 0;
+               if (behavior == DROP_CASCADE)
+                       xlrec.flags |= XLH_TRUNCATE_CASCADE;
+               if (restart_seqs)
+                       xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
+
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
+               XLogRegisterData((char *) logrelids, list_length(relids_logged) 
* sizeof(Oid));
+
+               XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+               (void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
+       }
+
        /*
         * Process all AFTER STATEMENT TRUNCATE triggers.
         */
@@ -1603,7 +1669,10 @@ ExecuteTruncate(TruncateStmt *stmt)
        /* We can clean up the EState now */
        FreeExecutorState(estate);
 
-       /* And close the rels (can't do this while EState still holds refs) */
+       /* And close the eventual rels opened by CASCADE
+        * (can't do this while EState still holds refs)
+        */
+       rels = list_difference_ptr(rels, explicit_rels);
        foreach(cell, rels)
        {
                Relation        rel = (Relation) lfirst(cell);
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 6eb0d5527e..c94ac29d83 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, 
XLogRecordBuffer *bu
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf);
 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf);
 
@@ -449,6 +450,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                                DecodeDelete(ctx, buf);
                        break;
 
+               case XLOG_HEAP_TRUNCATE:
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                               DecodeTruncate(ctx, buf);
+                       break;
+
                case XLOG_HEAP_INPLACE:
 
                        /*
@@ -825,6 +831,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
        ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, 
change);
 }
 
+/*
+ * Parse XLOG_HEAP_TRUNCATE from wal
+ */
+static void
+DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+       XLogReaderState *r = buf->record;
+       xl_heap_truncate *xlrec;
+       ReorderBufferChange *change;
+
+       xlrec = (xl_heap_truncate *) XLogRecGetData(r);
+
+       /* only interested in our database */
+       if (xlrec->dbId != ctx->slot->data.database)
+               return;
+
+       /* output plugin doesn't look for this origin, no need to queue */
+       if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+               return;
+
+       change = ReorderBufferGetChange(ctx->reorder);
+       change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+       change->origin_id = XLogRecGetOrigin(r);
+       if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+               change->data.truncate_msg.cascade = true;
+       if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+               change->data.truncate_msg.restart_seqs = true;
+       change->data.truncate_msg.nrelids = xlrec->nrelids;
+       change->data.truncate_msg.relids = palloc(xlrec->nrelids * sizeof(Oid));
+       memcpy(change->data.truncate_msg.relids, xlrec->relids,
+                  xlrec->nrelids * sizeof(Oid));
+       ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+                                                        buf->origptr, change);
+}
+
 /*
  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
  *
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 3d8ad7ddf8..0737c7b1e7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,
                                  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                  Relation relation, ReorderBufferChange 
*change);
+static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                 int nrelations, Relation relations[], 
ReorderBufferChange *change);
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                   XLogRecPtr message_lsn, bool transactional,
                                   const char *prefix, Size message_size, const 
char *message);
@@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options,
        /* wrap output plugin callbacks, so we can add error context 
information */
        ctx->reorder->begin = begin_cb_wrapper;
        ctx->reorder->apply_change = change_cb_wrapper;
+       ctx->reorder->apply_truncate = truncate_cb_wrapper;
        ctx->reorder->commit = commit_cb_wrapper;
        ctx->reorder->message = message_cb_wrapper;
 
@@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN 
*txn,
        error_context_stack = errcallback.previous;
 }
 
+static void
+truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                       int nrelations, Relation relations[], 
ReorderBufferChange *change)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       if (!ctx->callbacks.truncate_cb)
+               return;
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "truncate";
+       state.report_location = change->lsn;
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+
+       /*
+        * report this change's lsn so replies from clients can give an up2date
+        * answer. This won't ever be enough (and shouldn't be!) to confirm
+        * receipt of this transaction, but it might allow another transaction's
+        * commit to be confirmed with one message.
+        */
+       ctx->write_location = change->lsn;
+
+       ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index b4016ed52b..7691c1a8b4 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, 
ReorderBufferChange *change)
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+               case REORDER_BUFFER_CHANGE_TRUNCATE:
                        break;
        }
 
@@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                        specinsert = change;
                                        break;
 
+                               case REORDER_BUFFER_CHANGE_TRUNCATE:
+                               {
+                                       int                     i;
+                                       int                     nrelids = 
change->data.truncate_msg.nrelids;
+                                       int                     nrelations = 0;
+                                       Relation   *relations;
+
+                                       relations = palloc0(nrelids * 
sizeof(Relation));
+                                       for (i = 0; i < nrelids; i++)
+                                       {
+                                               Oid                     relid = 
change->data.truncate_msg.relids[i];
+                                               Relation        relation;
+
+                                               relation = 
RelationIdGetRelation(relid);
+
+                                               if (relation == NULL)
+                                                       elog(ERROR, "could not 
open relation with OID %u", relid);
+
+                                               if 
(!RelationIsLogicallyLogged(relation))
+                                                       continue;
+
+                                               relations[nrelations++] = 
relation;
+                                       }
+
+                                       rb->apply_truncate(rb, txn, nrelations, 
relations, change);
+
+                                       for (i = 0; i < nrelations; i++)
+                                               RelationClose(relations[i]);
+
+                                       break;
+                               }
+
                                case REORDER_BUFFER_CHANGE_MESSAGE:
                                        rb->message(rb, txn, change->lsn, true,
                                                                
change->data.msg.prefix,
@@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                                }
                                break;
                        }
+               case REORDER_BUFFER_CHANGE_TRUNCATE:
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
@@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                                break;
                        }
                        /* the base struct contains all the data, easy peasy */
+               case REORDER_BUFFER_CHANGE_TRUNCATE:
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 700e25c36a..0052e4c569 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -32,7 +32,7 @@
 #define XLOG_HEAP_INSERT               0x00
 #define XLOG_HEAP_DELETE               0x10
 #define XLOG_HEAP_UPDATE               0x20
-/* 0x030 is free, was XLOG_HEAP_MOVE */
+#define XLOG_HEAP_TRUNCATE             0x30
 #define XLOG_HEAP_HOT_UPDATE   0x40
 #define XLOG_HEAP_CONFIRM              0x50
 #define XLOG_HEAP_LOCK                 0x60
@@ -109,6 +109,27 @@ typedef struct xl_heap_delete
 
 #define SizeOfHeapDelete       (offsetof(xl_heap_delete, flags) + 
sizeof(uint8))
 
+/*
+ * xl_heap_delete flag values, 8 bits are available.
+ */
+#define XLH_TRUNCATE_CASCADE                                   (1<<0)
+#define XLH_TRUNCATE_RESTART_SEQS                              (1<<1)
+
+/*
+ * For truncate we list all truncated relids in an array, followed by all
+ * sequence relids that need to be restarted, if any.
+ * All rels are always within the same database, so we just list dbid once.
+ */
+typedef struct xl_heap_truncate
+{
+       Oid                     dbId;
+       uint32          nrelids;
+       uint8           flags;
+       Oid relids[FLEXIBLE_ARRAY_MEMBER];
+} xl_heap_truncate;
+
+#define SizeOfHeapTruncate     (offsetof(xl_heap_truncate, relids))
+
 /*
  * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
  * or updated tuple in WAL; we can save a few bytes by reconstructing the
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 04a961d383..70ee3da76b 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -54,6 +54,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, 
Oid relOid,
 extern void CheckTableNotInUse(Relation rel, const char *stmt);
 
 extern void ExecuteTruncate(TruncateStmt *stmt);
+extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List 
*relids_logged,
+                                                               DropBehavior 
behavior, bool restart_seqs);
 
 extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
 
diff --git a/src/include/replication/output_plugin.h 
b/src/include/replication/output_plugin.h
index 82875d6b3d..1ee0a56f03 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -61,6 +61,15 @@ typedef void (*LogicalDecodeChangeCB) (struct 
LogicalDecodingContext *ctx,
                                                                           
Relation relation,
                                                                           
ReorderBufferChange *change);
 
+/*
+ * Callback for every TRUNCATE in a successful transaction.
+ */
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+                                                                               
 ReorderBufferTXN *txn,
+                                                                               
 int nrelations,
+                                                                               
 Relation relations[],
+                                                                               
 ReorderBufferChange *change);
+
 /*
  * Called for every (explicit or implicit) COMMIT of a successful transaction.
  */
@@ -98,6 +107,7 @@ typedef struct OutputPluginCallbacks
        LogicalDecodeStartupCB startup_cb;
        LogicalDecodeBeginCB begin_cb;
        LogicalDecodeChangeCB change_cb;
+       LogicalDecodeTruncateCB truncate_cb;
        LogicalDecodeCommitCB commit_cb;
        LogicalDecodeMessageCB message_cb;
        LogicalDecodeFilterByOriginCB filter_by_origin_cb;
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index aa430c843c..13a8798115 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -59,7 +59,8 @@ enum ReorderBufferChangeType
        REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
        REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
        REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
-       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
+       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+       REORDER_BUFFER_CHANGE_TRUNCATE
 };
 
 /*
@@ -128,6 +129,18 @@ typedef struct ReorderBufferChange
                        CommandId       cmax;
                        CommandId       combocid;
                }                       tuplecid;
+
+               /*
+                * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
+                * one set of relations to be truncated.
+                */
+               struct
+               {
+                       Size            nrelids;
+                       bool            cascade;
+                       bool            restart_seqs;
+                       Oid                *relids;
+               }       truncate_msg;
        }                       data;
 
        /*
@@ -283,6 +296,14 @@ typedef void (*ReorderBufferApplyChangeCB) (
                                                                                
        Relation relation,
                                                                                
        ReorderBufferChange *change);
 
+/* truncate callback signature */
+typedef void (*ReorderBufferApplyTruncateCB) (
+                                                                               
          ReorderBuffer *rb,
+                                                                               
          ReorderBufferTXN *txn,
+                                                                               
          int nrelations,
+                                                                               
          Relation relations[],
+                                                                               
          ReorderBufferChange *change);
+
 /* begin callback signature */
 typedef void (*ReorderBufferBeginCB) (
                                                                          
ReorderBuffer *rb,
@@ -328,6 +349,7 @@ struct ReorderBuffer
         */
        ReorderBufferBeginCB begin;
        ReorderBufferApplyChangeCB apply_change;
+       ReorderBufferApplyTruncateCB apply_truncate;
        ReorderBufferCommitCB commit;
        ReorderBufferMessageCB message;
 

base-commit: 0a64b45152b593c5eb95f2e88fbce7fbfe84ae7b
-- 
2.17.0

From 7ee846501f8532cc6b6ebc06b07b8555ae8a57f4 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Thu, 5 Apr 2018 11:47:06 -0400
Subject: [PATCH v19 2/2] Logical replication support for TRUNCATE

---
 doc/src/sgml/catalogs.sgml                  |   8 +
 doc/src/sgml/logical-replication.sgml       |  13 +-
 doc/src/sgml/protocol.sgml                  |  56 +++++++
 doc/src/sgml/ref/create_publication.sgml    |  10 +-
 src/backend/catalog/pg_publication.c        |   1 +
 src/backend/commands/publicationcmds.c      |  20 ++-
 src/backend/replication/logical/proto.c     |  55 +++++++
 src/backend/replication/logical/worker.c    |  68 +++++++++
 src/backend/replication/pgoutput/pgoutput.c | 129 +++++++++++-----
 src/backend/utils/cache/relcache.c          |   3 +-
 src/bin/pg_dump/pg_dump.c                   |  33 +++-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/pg_dump/t/002_pg_dump.pl            |   2 +-
 src/bin/psql/describe.c                     |  26 +++-
 src/include/catalog/pg_publication.h        |   7 +-
 src/include/replication/logicalproto.h      |   4 +
 src/test/regress/expected/publication.out   |  84 +++++-----
 src/test/subscription/t/010_truncate.pl     | 161 ++++++++++++++++++++
 18 files changed, 571 insertions(+), 110 deletions(-)
 create mode 100644 src/test/subscription/t/010_truncate.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index d6a9d8c580..e8efa13e8d 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5518,6 +5518,14 @@ <title><structname>pg_publication</structname> 
Columns</title>
       <entry>If true, <command>DELETE</command> operations are replicated for
        tables in the publication.</entry>
      </row>
+
+     <row>
+      <entry><structfield>pubtruncate</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>If true, <command>TRUNCATE</command> operations are replicated for
+       tables in the publication.</entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/logical-replication.sgml 
b/doc/src/sgml/logical-replication.sgml
index 75551d8ee1..151e773fc2 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -108,8 +108,8 @@ <title>Publication</title>
 
   <para>
    Publications can choose to limit the changes they produce to
-   any combination of <command>INSERT</command>, <command>UPDATE</command>, and
-   <command>DELETE</command>, similar to how triggers are fired by
+   any combination of <command>INSERT</command>, <command>UPDATE</command>,
+   <command>DELETE</command>, and <command>TRUNCATE</command>, similar to how 
triggers are fired by
    particular event types.  By default, all operation types are replicated.
   </para>
 
@@ -364,15 +364,6 @@ <title>Restrictions</title>
     </para>
    </listitem>
 
-   <listitem>
-    <para>
-     <command>TRUNCATE</command> commands are not replicated.  This can, of
-     course, be worked around by using <command>DELETE</command> instead.  To
-     avoid accidental <command>TRUNCATE</command> invocations, you can revoke
-     the <literal>TRUNCATE</literal> privilege from tables.
-    </para>
-   </listitem>
-
    <listitem>
     <para>
      Large objects (see <xref linkend="largeobjects"/>) are not replicated.
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b94dd4ac65..004b36084f 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6774,6 +6774,62 @@ <title>Logical Replication Message Formats</title>
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+Truncate
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('T')
+</term>
+<listitem>
+<para>
+                Identifies the message as a truncate message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Number of relations
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Option bits for <command>TRUNCATE</command>:
+                1 for <literal>CASCADE</literal>, 2 for <literal>RESTART 
IDENTITY</literal>
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                ID of the relation corresponding to the ID in the relation
+                message.  This field is repeated for each relation.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 </variablelist>
 
 <para>
diff --git a/doc/src/sgml/ref/create_publication.sgml 
b/doc/src/sgml/ref/create_publication.sgml
index bfe12d5f41..99f87ca393 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -106,10 +106,11 @@ <title>Parameters</title>
           This parameter determines which DML operations will be published by
           the new publication to the subscribers.  The value is
           comma-separated list of operations.  The allowed operations are
-          <literal>insert</literal>, <literal>update</literal>, and
-          <literal>delete</literal>.  The default is to publish all actions,
+          <literal>insert</literal>, <literal>update</literal>,
+          <literal>delete</literal>, and <literal>truncate</literal>.
+          The default is to publish all actions,
           and so the default value for this option is
-          <literal>'insert, update, delete'</literal>.
+          <literal>'insert, update, delete, truncate'</literal>.
          </para>
         </listitem>
        </varlistentry>
@@ -168,8 +169,7 @@ <title>Notes</title>
   </para>
 
   <para>
-   <command>TRUNCATE</command> and <acronym>DDL</acronym> operations
-   are not published.
+   <acronym>DDL</acronym> operations are not published.
   </para>
  </refsect1>
 
diff --git a/src/backend/catalog/pg_publication.c 
b/src/backend/catalog/pg_publication.c
index ba18258ebb..ec3bd1d22d 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -376,6 +376,7 @@ GetPublication(Oid pubid)
        pub->pubactions.pubinsert = pubform->pubinsert;
        pub->pubactions.pubupdate = pubform->pubupdate;
        pub->pubactions.pubdelete = pubform->pubdelete;
+       pub->pubactions.pubtruncate = pubform->pubtruncate;
 
        ReleaseSysCache(tup);
 
diff --git a/src/backend/commands/publicationcmds.c 
b/src/backend/commands/publicationcmds.c
index 9c5aa9ebc2..29992d4a0e 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -62,7 +62,8 @@ parse_publication_options(List *options,
                                                  bool *publish_given,
                                                  bool *publish_insert,
                                                  bool *publish_update,
-                                                 bool *publish_delete)
+                                                 bool *publish_delete,
+                                                 bool *publish_truncate)
 {
        ListCell   *lc;
 
@@ -72,6 +73,7 @@ parse_publication_options(List *options,
        *publish_insert = true;
        *publish_update = true;
        *publish_delete = true;
+       *publish_truncate = true;
 
        /* Parse options */
        foreach(lc, options)
@@ -96,6 +98,7 @@ parse_publication_options(List *options,
                        *publish_insert = false;
                        *publish_update = false;
                        *publish_delete = false;
+                       *publish_truncate = false;
 
                        *publish_given = true;
                        publish = defGetString(defel);
@@ -116,6 +119,8 @@ parse_publication_options(List *options,
                                        *publish_update = true;
                                else if (strcmp(publish_opt, "delete") == 0)
                                        *publish_delete = true;
+                               else if (strcmp(publish_opt, "truncate") == 0)
+                                       *publish_truncate = true;
                                else
                                        ereport(ERROR,
                                                        
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt)
        bool            publish_insert;
        bool            publish_update;
        bool            publish_delete;
+       bool            publish_truncate;
        AclResult       aclresult;
 
        /* must have CREATE privilege on database */
@@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt)
 
        parse_publication_options(stmt->options,
                                                          &publish_given, 
&publish_insert,
-                                                         &publish_update, 
&publish_delete);
+                                                         &publish_update, 
&publish_delete,
+                                                         &publish_truncate);
 
        values[Anum_pg_publication_puballtables - 1] =
                BoolGetDatum(stmt->for_all_tables);
@@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt)
                BoolGetDatum(publish_update);
        values[Anum_pg_publication_pubdelete - 1] =
                BoolGetDatum(publish_delete);
+       values[Anum_pg_publication_pubtruncate - 1] =
+               BoolGetDatum(publish_truncate);
 
        tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, 
Relation rel,
        bool            publish_insert;
        bool            publish_update;
        bool            publish_delete;
+       bool            publish_truncate;
        ObjectAddress obj;
 
        parse_publication_options(stmt->options,
                                                          &publish_given, 
&publish_insert,
-                                                         &publish_update, 
&publish_delete);
+                                                         &publish_update, 
&publish_delete,
+                                                         &publish_truncate);
 
        /* Everything ok, form a new tuple. */
        memset(values, 0, sizeof(values));
@@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, 
Relation rel,
 
                values[Anum_pg_publication_pubdelete - 1] = 
BoolGetDatum(publish_delete);
                replaces[Anum_pg_publication_pubdelete - 1] = true;
+
+               values[Anum_pg_publication_pubtruncate - 1] = 
BoolGetDatum(publish_truncate);
+               replaces[Anum_pg_publication_pubtruncate - 1] = true;
        }
 
        tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
diff --git a/src/backend/replication/logical/proto.c 
b/src/backend/replication/logical/proto.c
index 948343e4ae..edc97a7662 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -26,6 +26,9 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define TRUNCATE_CASCADE               (1<<0)
+#define TRUNCATE_RESTART_SEQS  (1<<1)
+
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
                                           HeapTuple tuple);
@@ -292,6 +295,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData 
*oldtup)
        return relid;
 }
 
+/*
+ * Write TRUNCATE to the output stream.
+ */
+void
+logicalrep_write_truncate(StringInfo out,
+                                                 int nrelids,
+                                                 Oid relids[],
+                                                 bool cascade, bool 
restart_seqs)
+{
+       int                     i;
+       uint8 flags = 0;
+
+       pq_sendbyte(out, 'T');          /* action TRUNCATE */
+
+       pq_sendint32(out, nrelids);
+
+       /* encode and send truncate flags */
+       if (cascade)
+               flags |= TRUNCATE_CASCADE;
+       if (restart_seqs)
+               flags |= TRUNCATE_RESTART_SEQS;
+       pq_sendint8(out, flags);
+
+       for (i = 0; i < nrelids; i++)
+               pq_sendint32(out, relids[i]);
+}
+
+/*
+ * Read TRUNCATE from stream.
+ */
+List *
+logicalrep_read_truncate(StringInfo in,
+                                                bool *cascade, bool 
*restart_seqs)
+{
+       int                     i;
+       int                     nrelids;
+       List       *relids = NIL;
+       uint8 flags;
+
+       nrelids = pq_getmsgint(in, 4);
+
+       /* read and decode truncate flags */
+       flags = pq_getmsgint(in, 1);
+       *cascade = (flags & TRUNCATE_CASCADE) > 0;
+       *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
+
+       for (i = 0; i < nrelids; i++)
+               relids = lappend_oid(relids, pq_getmsgint(in, 4));
+
+       return relids;
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index fdace7eea2..18766364f7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -30,10 +30,12 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 
+#include "commands/tablecmds.h"
 #include "commands/trigger.h"
 
 #include "executor/executor.h"
@@ -83,6 +85,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/timeout.h"
 #include "utils/tqual.h"
 #include "utils/syscache.h"
@@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s)
        CommandCounterIncrement();
 }
 
+/*
+ * Handle TRUNCATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_truncate(StringInfo s)
+{
+       bool     cascade = false;
+       bool     restart_seqs = false;
+       List    *remote_relids = NIL;
+       List    *remote_rels = NIL;
+       List    *rels = NIL;
+       List    *relids = NIL;
+       List    *relids_logged = NIL;
+       ListCell *lc;
+
+       ensure_transaction();
+
+       remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
+
+       foreach(lc, remote_relids)
+       {
+               LogicalRepRelId relid = lfirst_oid(lc);
+               LogicalRepRelMapEntry *rel;
+
+               rel = logicalrep_rel_open(relid, RowExclusiveLock);
+               if (!should_apply_changes_for_rel(rel))
+               {
+                       /*
+                        * The relation can't become interesting in the middle 
of the
+                        * transaction so it's safe to unlock it.
+                        */
+                       logicalrep_rel_close(rel, RowExclusiveLock);
+                       continue;
+               }
+
+               remote_rels = lappend(remote_rels, rel);
+               rels = lappend(rels, rel->localrel);
+               relids = lappend_oid(relids, rel->localreloid);
+               if (RelationIsLogicallyLogged(rel->localrel))
+                       relids_logged = lappend_oid(relids, rel->localreloid);
+       }
+
+       /*
+        * Even if we used CASCADE on the upstream master we explicitly
+        * default to replaying changes without further cascading.
+        * This might be later changeable with a user specified option.
+        */
+       ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, 
restart_seqs);
+
+       foreach(lc, remote_rels)
+       {
+               LogicalRepRelMapEntry *rel = lfirst(lc);
+
+               logicalrep_rel_close(rel, NoLock);
+       }
+
+       CommandCounterIncrement();
+}
+
 
 /*
  * Logical replication protocol message dispatcher.
@@ -919,6 +983,10 @@ apply_dispatch(StringInfo s)
                case 'D':
                        apply_handle_delete(s);
                        break;
+                       /* TRUNCATE */
+               case 'T':
+                       apply_handle_truncate(s);
+                       break;
                        /* RELATION */
                case 'R':
                        apply_handle_relation(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c 
b/src/backend/replication/pgoutput/pgoutput.c
index aa9cf5b54e..e31ad7e027 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
 static void pgoutput_change(LogicalDecodingContext *ctx,
                                ReorderBufferTXN *txn, Relation rel,
                                ReorderBufferChange *change);
+static void pgoutput_truncate(LogicalDecodingContext *ctx,
+                                                         ReorderBufferTXN 
*txn, int nrelations, Relation relations[],
+                                                         ReorderBufferChange 
*change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                           RepOriginId origin_id);
 
@@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->startup_cb = pgoutput_startup;
        cb->begin_cb = pgoutput_begin_txn;
        cb->change_cb = pgoutput_change;
+       cb->truncate_cb = pgoutput_truncate;
        cb->commit_cb = pgoutput_commit_txn;
        cb->filter_by_origin_cb = pgoutput_origin_filter;
        cb->shutdown_cb = pgoutput_shutdown;
@@ -250,6 +254,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        OutputPluginWrite(ctx, true);
 }
 
+/*
+ * Write the relation schema if the current schema haven't been sent yet.
+ */
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+                                 Relation relation, RelationSyncEntry 
*relentry)
+{
+       if (!relentry->schema_sent)
+       {
+               TupleDesc       desc;
+               int                     i;
+
+               desc = RelationGetDescr(relation);
+
+               /*
+                * Write out type info if needed. We do that only for user 
created
+                * types.
+                */
+               for (i = 0; i < desc->natts; i++)
+               {
+                       Form_pg_attribute att = TupleDescAttr(desc, i);
+
+                       if (att->attisdropped)
+                               continue;
+
+                       if (att->atttypid < FirstNormalObjectId)
+                               continue;
+
+                       OutputPluginPrepareWrite(ctx, false);
+                       logicalrep_write_typ(ctx->out, att->atttypid);
+                       OutputPluginWrite(ctx, false);
+               }
+
+               OutputPluginPrepareWrite(ctx, false);
+               logicalrep_write_rel(ctx->out, relation);
+               OutputPluginWrite(ctx, false);
+               relentry->schema_sent = true;
+       }
+}
+
 /*
  * Sends the decoded DML over wire.
  */
@@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        /* Avoid leaking memory by using and resetting our own context */
        old = MemoryContextSwitchTo(data->context);
 
-       /*
-        * Write the relation schema if the current schema haven't been sent 
yet.
-        */
-       if (!relentry->schema_sent)
-       {
-               TupleDesc       desc;
-               int                     i;
-
-               desc = RelationGetDescr(relation);
-
-               /*
-                * Write out type info if needed. We do that only for user 
created
-                * types.
-                */
-               for (i = 0; i < desc->natts; i++)
-               {
-                       Form_pg_attribute att = TupleDescAttr(desc, i);
-
-                       if (att->attisdropped)
-                               continue;
-
-                       if (att->atttypid < FirstNormalObjectId)
-                               continue;
-
-                       OutputPluginPrepareWrite(ctx, false);
-                       logicalrep_write_typ(ctx->out, att->atttypid);
-                       OutputPluginWrite(ctx, false);
-               }
-
-               OutputPluginPrepareWrite(ctx, false);
-               logicalrep_write_rel(ctx->out, relation);
-               OutputPluginWrite(ctx, false);
-               relentry->schema_sent = true;
-       }
+       maybe_send_schema(ctx, relation, relentry);
 
        /* Send the data */
        switch (change->action)
@@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
        MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                 int nrelations, Relation relations[], 
ReorderBufferChange *change)
+{
+       PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       MemoryContext old;
+       RelationSyncEntry *relentry;
+       int                     i;
+       int                     nrelids;
+       Oid                *relids;
+
+       old = MemoryContextSwitchTo(data->context);
+
+       relids = palloc0(nrelations * sizeof(Oid));
+       nrelids = 0;
+
+       for (i = 0; i < nrelations; i++)
+       {
+               Relation        relation = relations[i];
+               Oid                     relid = RelationGetRelid(relation);
+
+               relentry = get_rel_sync_entry(data, relid);
+
+               if (!is_publishable_relation(relation))
+                       continue;
+
+               if (!relentry->pubactions.pubtruncate)
+                       continue;
+
+               relids[nrelids++] = relid;
+               maybe_send_schema(ctx, relation, relentry);
+       }
+
+       OutputPluginPrepareWrite(ctx, true);
+       logicalrep_write_truncate(ctx->out,
+                                                         nrelids,
+                                                         relids,
+                                                         
change->data.truncate_msg.cascade,
+                                                         
change->data.truncate_msg.restart_seqs);
+       OutputPluginWrite(ctx, true);
+
+       MemoryContextSwitchTo(old);
+       MemoryContextReset(data->context);
+}
+
 /*
  * Currently we always forward.
  */
@@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                 * we only need to consider ones that the subscriber requested.
                 */
                entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-                       entry->pubactions.pubdelete = false;
+                       entry->pubactions.pubdelete = 
entry->pubactions.pubtruncate = false;
 
                foreach(lc, data->publications)
                {
@@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                                entry->pubactions.pubinsert |= 
pub->pubactions.pubinsert;
                                entry->pubactions.pubupdate |= 
pub->pubactions.pubupdate;
                                entry->pubactions.pubdelete |= 
pub->pubactions.pubdelete;
+                               entry->pubactions.pubtruncate |= 
pub->pubactions.pubtruncate;
                        }
 
                        if (entry->pubactions.pubinsert && 
entry->pubactions.pubupdate &&
-                               entry->pubactions.pubdelete)
+                               entry->pubactions.pubdelete && 
entry->pubactions.pubtruncate)
                                break;
                }
 
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index 69a2114a10..6a67c185b0 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation)
                pubactions->pubinsert |= pubform->pubinsert;
                pubactions->pubupdate |= pubform->pubupdate;
                pubactions->pubdelete |= pubform->pubdelete;
+               pubactions->pubtruncate |= pubform->pubtruncate;
 
                ReleaseSysCache(tup);
 
@@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation)
                 * other publications.
                 */
                if (pubactions->pubinsert && pubactions->pubupdate &&
-                       pubactions->pubdelete)
+                       pubactions->pubdelete && pubactions->pubtruncate)
                        break;
        }
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index d066f4f00b..69016a6c4d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3712,6 +3712,7 @@ getPublications(Archive *fout)
        int                     i_pubinsert;
        int                     i_pubupdate;
        int                     i_pubdelete;
+       int                     i_pubtruncate;
        int                     i,
                                ntups;
 
@@ -3723,12 +3724,20 @@ getPublications(Archive *fout)
        resetPQExpBuffer(query);
 
        /* Get the publications. */
-       appendPQExpBuffer(query,
-                                         "SELECT p.tableoid, p.oid, p.pubname, 
"
-                                         "(%s p.pubowner) AS rolname, "
-                                         "p.puballtables, p.pubinsert, 
p.pubupdate, p.pubdelete "
-                                         "FROM pg_publication p",
-                                         username_subquery);
+       if (fout->remoteVersion >= 110000)
+               appendPQExpBuffer(query,
+                                                 "SELECT p.tableoid, p.oid, 
p.pubname, "
+                                                 "(%s p.pubowner) AS rolname, "
+                                                 "p.puballtables, p.pubinsert, 
p.pubupdate, p.pubdelete, p.pubtruncate "
+                                                 "FROM pg_publication p",
+                                                 username_subquery);
+       else
+               appendPQExpBuffer(query,
+                                                 "SELECT p.tableoid, p.oid, 
p.pubname, "
+                                                 "(%s p.pubowner) AS rolname, "
+                                                 "p.puballtables, p.pubinsert, 
p.pubupdate, p.pubdelete, false AS pubtruncate "
+                                                 "FROM pg_publication p",
+                                                 username_subquery);
 
        res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
@@ -3742,6 +3751,7 @@ getPublications(Archive *fout)
        i_pubinsert = PQfnumber(res, "pubinsert");
        i_pubupdate = PQfnumber(res, "pubupdate");
        i_pubdelete = PQfnumber(res, "pubdelete");
+       i_pubtruncate = PQfnumber(res, "pubtruncate");
 
        pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3762,6 +3772,8 @@ getPublications(Archive *fout)
                        (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
                pubinfo[i].pubdelete =
                        (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+               pubinfo[i].pubtruncate =
+                       (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
 
                if (strlen(pubinfo[i].rolname) == 0)
                        write_msg(NULL, "WARNING: owner of publication \"%s\" 
appears to be invalid\n",
@@ -3829,6 +3841,15 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
                first = false;
        }
 
+       if (pubinfo->pubtruncate)
+       {
+               if (!first)
+                       appendPQExpBufferStr(query, ", ");
+
+               appendPQExpBufferStr(query, "truncate");
+               first = false;
+       }
+
        appendPQExpBufferStr(query, "');\n");
 
        ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index a4d6d926a8..c2314758de 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -595,6 +595,7 @@ typedef struct _PublicationInfo
        bool            pubinsert;
        bool            pubupdate;
        bool            pubdelete;
+       bool            pubtruncate;
 } PublicationInfo;
 
 /*
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index acdfde2a1e..25852b903c 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2038,7 +2038,7 @@
                create_order => 50,
                create_sql   => 'CREATE PUBLICATION pub1;',
                regexp       => qr/^
-                       \QCREATE PUBLICATION pub1 WITH (publish = 'insert, 
update, delete');\E
+                       \QCREATE PUBLICATION pub1 WITH (publish = 'insert, 
update, delete, truncate');\E
                        /xm,
                like => {
                        %full_runs,
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 0c3be1f504..75a1e42cee 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5187,7 +5187,7 @@ listPublications(const char *pattern)
        PQExpBufferData buf;
        PGresult   *res;
        printQueryOpt myopt = pset.popt;
-       static const bool translate_columns[] = {false, false, false, false, 
false, false};
+       static const bool translate_columns[] = {false, false, false, false, 
false, false, false};
 
        if (pset.sversion < 100000)
        {
@@ -5207,13 +5207,17 @@ listPublications(const char *pattern)
                                          "  puballtables AS \"%s\",\n"
                                          "  pubinsert AS \"%s\",\n"
                                          "  pubupdate AS \"%s\",\n"
-                                         "  pubdelete AS \"%s\"\n",
+                                         "  pubdelete AS \"%s\"",
                                          gettext_noop("Name"),
                                          gettext_noop("Owner"),
                                          gettext_noop("All tables"),
                                          gettext_noop("Inserts"),
                                          gettext_noop("Updates"),
                                          gettext_noop("Deletes"));
+       if (pset.sversion >= 110000)
+               appendPQExpBuffer(&buf,
+                                                 ",\n  pubtruncate AS \"%s\"",
+                                                 gettext_noop("Truncates"));
 
        appendPQExpBufferStr(&buf,
                                                 "\nFROM 
pg_catalog.pg_publication\n");
@@ -5254,6 +5258,7 @@ describePublications(const char *pattern)
        PQExpBufferData buf;
        int                     i;
        PGresult   *res;
+       bool            has_pubtruncate;
 
        if (pset.sversion < 100000)
        {
@@ -5265,13 +5270,19 @@ describePublications(const char *pattern)
                return true;
        }
 
+       has_pubtruncate = (pset.sversion >= 110000);
+
        initPQExpBuffer(&buf);
 
        printfPQExpBuffer(&buf,
                                          "SELECT oid, pubname,\n"
                                          "  
pg_catalog.pg_get_userbyid(pubowner) AS owner,\n"
-                                         "  puballtables, pubinsert, 
pubupdate, pubdelete\n"
-                                         "FROM pg_catalog.pg_publication\n");
+                                         "  puballtables, pubinsert, 
pubupdate, pubdelete");
+       if (has_pubtruncate)
+               appendPQExpBuffer(&buf,
+                                                 ", pubtruncate");
+       appendPQExpBuffer(&buf,
+                                         "\nFROM pg_catalog.pg_publication\n");
 
        processSQLNamePattern(pset.db, &buf, pattern, false, false,
                                                  NULL, "pubname", NULL,
@@ -5317,6 +5328,9 @@ describePublications(const char *pattern)
                printTableOpt myopt = pset.popt.topt;
                printTableContent cont;
 
+               if (has_pubtruncate)
+                       ncols++;
+
                initPQExpBuffer(&title);
                printfPQExpBuffer(&title, _("Publication %s"), pubname);
                printTableInit(&cont, &myopt, title.data, ncols, nrows);
@@ -5326,12 +5340,16 @@ describePublications(const char *pattern)
                printTableAddHeader(&cont, gettext_noop("Inserts"), true, 
align);
                printTableAddHeader(&cont, gettext_noop("Updates"), true, 
align);
                printTableAddHeader(&cont, gettext_noop("Deletes"), true, 
align);
+               if (has_pubtruncate)
+                       printTableAddHeader(&cont, gettext_noop("Truncates"), 
true, align);
 
                printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
+               if (has_pubtruncate)
+                       printTableAddCell(&cont, PQgetvalue(res, i, 7), false, 
false);
 
                if (!puballtables)
                {
diff --git a/src/include/catalog/pg_publication.h 
b/src/include/catalog/pg_publication.h
index 37e77b8be7..b643c489cd 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -49,6 +49,9 @@ CATALOG(pg_publication,6104)
        /* true if deletes are published */
        bool            pubdelete;
 
+       /* true if truncates are published */
+       bool            pubtruncate;
+
 } FormData_pg_publication;
 
 /* ----------------
@@ -63,19 +66,21 @@ typedef FormData_pg_publication *Form_pg_publication;
  * ----------------
  */
 
-#define Natts_pg_publication                           6
+#define Natts_pg_publication                           7
 #define Anum_pg_publication_pubname                    1
 #define Anum_pg_publication_pubowner           2
 #define Anum_pg_publication_puballtables       3
 #define Anum_pg_publication_pubinsert          4
 #define Anum_pg_publication_pubupdate          5
 #define Anum_pg_publication_pubdelete          6
+#define Anum_pg_publication_pubtruncate                7
 
 typedef struct PublicationActions
 {
        bool            pubinsert;
        bool            pubupdate;
        bool            pubdelete;
+       bool            pubtruncate;
 } PublicationActions;
 
 typedef struct Publication
diff --git a/src/include/replication/logicalproto.h 
b/src/include/replication/logicalproto.h
index 116f16f42d..92e88d3127 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -97,6 +97,10 @@ extern void logicalrep_write_delete(StringInfo out, Relation 
rel,
                                                HeapTuple oldtuple);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
                                           LogicalRepTupleData *oldtup);
+extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid 
relids[],
+                                               bool cascade, bool 
restart_seqs);
+extern List *logicalrep_read_truncate(StringInfo in,
+                                               bool *cascade, bool 
*restart_seqs);
 extern void logicalrep_write_rel(StringInfo out, Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, Oid typoid);
diff --git a/src/test/regress/expected/publication.out 
b/src/test/regress/expected/publication.out
index 0c86c647bc..afbbdd543d 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -21,20 +21,20 @@ ERROR:  unrecognized publication parameter: foo
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
 ERROR:  unrecognized "publish" value: "cluster"
 \dRp
-                                   List of publications
-        Name        |          Owner           | All tables | Inserts | 
Updates | Deletes 
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f          | t       | f      
 | f
- testpub_default    | regress_publication_user | f          | f       | t      
 | f
+                                         List of publications
+        Name        |          Owner           | All tables | Inserts | 
Updates | Deletes | Truncates 
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f      
 | f       | f
+ testpub_default    | regress_publication_user | f          | f       | t      
 | f       | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                   List of publications
-        Name        |          Owner           | All tables | Inserts | 
Updates | Deletes 
---------------------+--------------------------+------------+---------+---------+---------
- testpib_ins_trunct | regress_publication_user | f          | t       | f      
 | f
- testpub_default    | regress_publication_user | f          | t       | t      
 | t
+                                         List of publications
+        Name        |          Owner           | All tables | Inserts | 
Updates | Deletes | Truncates 
+--------------------+--------------------------+------------+---------+---------+---------+-----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f      
 | f       | f
+ testpub_default    | regress_publication_user | f          | t       | t      
 | t       | f
 (2 rows)
 
 --- adding tables
@@ -76,10 +76,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                  Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | t          | t       | t       | f
+                        Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | t          | t       | t       | f       | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -89,19 +89,19 @@ CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
 CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 \dRp+ testpub3
-                        Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                              Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                        Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                              Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
 Tables:
     "public.testpub_tbl3"
 
@@ -119,10 +119,10 @@ ERROR:  relation "testpub_tbl1" is already member of 
publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                     Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                           Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -165,10 +165,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                     Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                           Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -210,10 +210,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                     Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes 
---------------------------+------------+---------+---------+---------
- regress_publication_user | f          | t       | t       | t
+                           Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -223,20 +223,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                               List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | 
Deletes 
--------------+--------------------------+------------+---------+---------+---------
- testpub_foo | regress_publication_user | f          | t       | t       | t
+                                     List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | 
Deletes | Truncates 
+-------------+--------------------------+------------+---------+---------+---------+-----------
+ testpub_foo | regress_publication_user | f          | t       | t       | t   
    | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                  List of publications
-      Name       |           Owner           | All tables | Inserts | Updates 
| Deletes 
------------------+---------------------------+------------+---------+---------+---------
- testpub_default | regress_publication_user2 | f          | t       | t       
| t
+                                        List of publications
+      Name       |           Owner           | All tables | Inserts | Updates 
| Deletes | Truncates 
+-----------------+---------------------------+------------+---------+---------+---------+-----------
+ testpub_default | regress_publication_user2 | f          | t       | t       
| t       | f
 (1 row)
 
 DROP PUBLICATION testpub_default;
diff --git a/src/test/subscription/t/010_truncate.pl 
b/src/test/subscription/t/010_truncate.pl
new file mode 100644
index 0000000000..8ea4ab624f
--- /dev/null
+++ b/src/test/subscription/t/010_truncate.pl
@@ -0,0 +1,161 @@
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE SEQUENCE seq1 OWNED BY tab1.a"
+);
+$node_subscriber->safe_psql('postgres',
+       "ALTER SEQUENCE seq1 START 101"
+);
+
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub1 FOR TABLE tab1");
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr 
application_name=sub1' PUBLICATION pub1");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr 
application_name=sub2' PUBLICATION pub2");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr 
application_name=sub3' PUBLICATION pub3");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# insert data to truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), 
(3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# truncate and check
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+my $result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+       'truncate replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT nextval('seq1')");
+is($result, qq(1),
+       'sequence not restarted');
+
+# truncate with restart identity
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT nextval('seq1')");
+is($result, qq(101),
+       'truncate restarted identities');
+
+# test publication that does not replicate truncate
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), 
(3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+       'truncate not replicated');
+
+$node_publisher->safe_psql('postgres',
+       "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(0||),
+       'truncate replicated after publication change');
+
+# test multiple tables connected by foreign keys
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (2), 
(3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab4 VALUES (11, 1), 
(111, 1), (22, 2)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4");
+
+$node_publisher->wait_for_catchup('sub3');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab3");
+is($result, qq(0||),
+       'truncate of multiple tables replicated');
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(x), max(x) FROM tab4");
+is($result, qq(0||),
+       'truncate of multiple tables replicated');
+
+# test truncate of multiple tables, some of which are not published
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2");
+
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), 
(3)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), 
(3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+       'truncate of multiple tables some not published');
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3),
+       'truncate of multiple tables some not published');
-- 
2.17.0

Reply via email to