diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index f6e77fbda1..3749e06707 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -388,6 +388,80 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 	}
 }
 
+/* print the tuple 'tuple' into the StringInfo s */
+static void
+ztuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, ZHeapTuple tuple, bool skip_nulls)
+{
+	int			natt;
+
+	/* print all columns individually */
+	for (natt = 0; natt < tupdesc->natts; natt++)
+	{
+		Form_pg_attribute attr; /* the attribute itself */
+		Oid			typid;		/* type of current attribute */
+		Oid			typoutput;	/* output function */
+		bool		typisvarlena;
+		Datum		origval;	/* possibly toasted Datum */
+		bool		isnull;		/* column is null? */
+
+		attr = TupleDescAttr(tupdesc, natt);
+
+		/*
+		 * don't print dropped columns, we can't be sure everything is
+		 * available for them
+		 */
+		if (attr->attisdropped)
+			continue;
+
+		/*
+		 * Don't print system columns, oid will already have been printed if
+		 * present.
+		 */
+		if (attr->attnum < 0)
+			continue;
+
+		typid = attr->atttypid;
+
+		/* get Datum from tuple */
+		origval = zheap_getattr(tuple, natt + 1, tupdesc, &isnull);
+
+		if (isnull && skip_nulls)
+			continue;
+
+		/* print attribute name */
+		appendStringInfoChar(s, ' ');
+		appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
+
+		/* print attribute type */
+		appendStringInfoChar(s, '[');
+		appendStringInfoString(s, format_type_be(typid));
+		appendStringInfoChar(s, ']');
+
+		/* query output function */
+		getTypeOutputInfo(typid,
+			&typoutput, &typisvarlena);
+
+		/* print separator */
+		appendStringInfoChar(s, ':');
+
+		/* print data */
+		if (isnull)
+			appendStringInfoString(s, "null");
+		else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
+			appendStringInfoString(s, "unchanged-toast-datum");
+		else if (!typisvarlena)
+			print_literal(s, typid,
+				OidOutputFunctionCall(typoutput, origval));
+		else
+		{
+			Datum		val;	/* definitely detoasted Datum */
+
+			val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+			print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
+		}
+	}
+}
+
 /*
  * callback for individual changed tuples
  */
@@ -468,6 +542,27 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									&change->data.tp.oldtuple->tuple,
 									true);
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			appendStringInfoString(ctx->out, " INSERT:");
+			if (change->data.ztp.newtuple == NULL)
+				appendStringInfoString(ctx->out, " (no-tuple-data)");
+			else
+				ztuple_to_stringinfo(ctx->out, tupdesc,
+									 &change->data.ztp.newtuple->tuple,
+									 false);
+			break;
+		case REORDER_BUFFER_CHANGE_ZDELETE:
+			appendStringInfoString(ctx->out, " DELETE:");
+
+			/* if there was no PK, we only know that a delete happened */
+			if (change->data.ztp.oldtuple == NULL)
+				appendStringInfoString(ctx->out, " (no-tuple-data)");
+			/* In DELETE, only the replica identity is present; display that */
+			else
+				ztuple_to_stringinfo(ctx->out, tupdesc,
+									 &change->data.ztp.oldtuple->tuple,
+									 true);
+			break;
 		default:
 			Assert(false);
 	}
diff --git a/src/backend/access/zheap/zheapam.c b/src/backend/access/zheap/zheapam.c
index c916bde3fb..ac93c7d37a 100644
--- a/src/backend/access/zheap/zheapam.c
+++ b/src/backend/access/zheap/zheapam.c
@@ -115,6 +115,8 @@ static void zheap_lock_tuple_guts(Relation rel, Buffer buf, ZHeapTuple zhtup,
 					  TransactionId single_locker_xid, int single_locker_trans_slot,
 					  UndoRecPtr prev_urecptr, CommandId cid,
 					  bool any_multi_locker_member_alive);
+static ZHeapTuple ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp,
+						bool key_changed, bool *copy);
 static void compute_new_xid_infomask(ZHeapTuple zhtup, Buffer buf,
 						 TransactionId tup_xid, int tup_trans_slot,
 						 uint16 old_infomask, TransactionId add_to_xid,
@@ -1259,6 +1261,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	CommandId		tup_cid;
 	ItemId		lp;
 	ZHeapTupleData zheaptup;
+	ZHeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	UnpackedUndoRecord	undorecord;
 	Page		page;
 	BlockNumber blkno;
@@ -1280,6 +1283,7 @@ zheap_delete(Relation relation, ItemPointer tid,
 	bool		lock_reacquired;
 	bool		hasSubXactLock = false;
 	bool		hasPayload = false;
+	bool		old_key_copied = false;
 	xl_undolog_meta undometa;
 	uint8		vm_status;
 
@@ -1933,6 +1937,13 @@ zheap_tuple_updated:
 	vm_status = visibilitymap_get_status(relation,
 								BufferGetBlockNumber(buffer), &vmbuffer);
 
+	/*
+	 * Compute replica identity tuple before entering the critical section so
+	 * we don't PANIC upon a memory allocation failure.
+	 */
+	old_key_tuple = ZExtractReplicaIdentity(relation, &zheaptup, true,
+											&old_key_copied);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -1986,6 +1997,7 @@ zheap_tuple_updated:
 		XLogRecPtr	RedoRecPtr;
 		uint32		totalundotuplen = 0;
 		Size		dataoff;
+		int			bufflags = 0;
 		bool		doPageWrites;
 
 		/*
@@ -2006,6 +2018,15 @@ zheap_tuple_updated:
 			xlrec.flags |= XLZ_DELETE_IS_PARTITION_MOVE;
 		if (hasSubXactLock)
 			xlrec.flags |= XLZ_DELETE_CONTAINS_SUBXACT;
+		if (old_key_tuple != NULL)
+		{
+			bufflags |= REGBUF_KEEP_DATA;
+
+			if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_TUPLE;
+			else
+				xlrec.flags |= XLZ_DELETE_CONTAINS_OLD_KEY;
+		}
 
 		/*
 		 * If full_page_writes is enabled, and the buffer image is not
@@ -2051,7 +2072,27 @@ prepare_xlog:
 							totalundotuplen - SizeofZHeapTupleHeader);
 		}
 
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+
+		/*
+		 * Log replica identity of the deleted tuple if there is one
+		 */
+		if (old_key_tuple != NULL)
+		{
+			xl_zheap_header xlzhdr;
+
+			xlzhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+			xlzhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+			xlzhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+			XLogRegisterBufData(0, (char *) &xlzhdr, SizeOfZHeapHeader);
+			XLogRegisterBufData(0,
+								(char *) old_key_tuple->t_data +
+								SizeofZHeapTupleHeader,
+								old_key_tuple->t_len -
+								SizeofZHeapTupleHeader);
+		}
+
 		if (trans_slot_id > ZHEAP_PAGE_TRANS_SLOTS)
 			(void) RegisterTPDBuffer(page, 1);
 
@@ -2108,6 +2149,9 @@ prepare_xlog:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, &(zheaptup.t_self), LockTupleExclusive);
 
+	if (old_key_tuple != NULL && old_key_copied)
+		zheap_freetuple(old_key_tuple);
+
 	pgstat_count_heap_delete(relation);
 
 	return HeapTupleMayBeUpdated;
@@ -5846,6 +5890,102 @@ prepare_xlog:
 	UnlockReleaseTPDBuffers();
 }
 
+/*
+ * Build a zheap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log an identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static ZHeapTuple
+ZExtractReplicaIdentity(Relation relation, ZHeapTuple tp, bool key_changed,
+						bool *copy)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	Oid			replidindex;
+	Relation	idx_rel;
+	char		replident = relation->rd_rel->relreplident;
+	ZHeapTuple	key_tuple = NULL;
+	bool		nulls[MaxHeapAttributeNumber];
+	Datum		values[MaxHeapAttributeNumber];
+	int			natt;
+
+	*copy = false;
+
+	if (!RelationIsLogicallyLogged(relation))
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_NOTHING)
+		return NULL;
+
+	if (replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * When logging the entire old tuple, it very well could contain
+		 * toasted columns. If so, force them to be inlined.
+		 */
+		if (ZHeapTupleHasExternal(tp))
+		{
+			elog(ERROR, "toast tables are not supported with replica identity");
+		}
+		return tp;
+	}
+
+	/* if the key hasn't changed and we're only logging the key, we're done */
+	if (!key_changed)
+		return NULL;
+
+	/* find the replica identity index */
+	replidindex = RelationGetReplicaIndex(relation);
+	if (!OidIsValid(replidindex))
+	{
+		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
+			 RelationGetRelationName(relation));
+		return NULL;
+	}
+
+	idx_rel = RelationIdGetRelation(replidindex);
+
+	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
+	/* deform tuple, so we have fast access to columns */
+	zheap_deform_tuple(tp, desc, values, nulls);
+
+	/* set all columns to NULL, regardless of whether they actually are */
+	memset(nulls, 1, sizeof(nulls));
+
+	/*
+	 * Now set all columns contained in the index to NOT NULL, they cannot
+	 * currently be NULL.
+	 */
+	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
+	{
+		int			attno = idx_rel->rd_index->indkey.values[natt];
+
+		if (attno < 0)
+			elog(ERROR, "system column in index");
+		nulls[attno - 1] = false;
+	}
+
+	key_tuple = zheap_form_tuple(desc, values, nulls);
+	*copy = true;
+	RelationClose(idx_rel);
+
+	/*
+	 * If the tuple, which by here only contains indexed columns, still has
+	 * toasted columns, force them to be inlined. This is somewhat unlikely
+	 * since there's limits on the size of indexed columns, so we don't
+	 * duplicate toast_flatten_tuple()s functionality in the above loop over
+	 * the indexed columns, even if it would be more efficient.
+	 */
+	if (ZHeapTupleHasExternal(key_tuple))
+	{
+		elog(ERROR, "toast tables are not supported with replica identity");
+	}
+
+	return key_tuple;
+}
+
 /*
  * compute_new_xid_infomask - Given the old values of tuple header's infomask,
  * compute the new values for tuple header which includes lock mode, new
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 95153f4e29..9bed68a6e1 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,9 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/zheap.h"
+#include "access/zheapam_xlog.h"
+#include "access/zhtup.h"
 
 #include "catalog/pg_control.h"
 
@@ -57,6 +60,7 @@ typedef struct XLogRecordBuffer
 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -74,6 +78,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+/* record handlers for zheap */
+static void DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -161,7 +170,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			break;
 		case RM_ZHEAP_ID:
 			/* Logical decoding is not yet implemented for zheap. */
-			Assert(0);
+			DecodeZHeapOp(ctx, &buf);
 			break;
 		case RM_ZHEAP2_ID:
 			/* Logical decoding is not yet implemented for zheap. */
@@ -510,6 +519,48 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr ZHEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeZHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	uint8		info = XLogRecGetInfo(buf->record) & XLOG_ZHEAP_OPMASK;
+	TransactionId xid = XLogRecGetXid(buf->record);
+	SnapBuild  *builder = ctx->snapshot_builder;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	switch (info)
+	{
+		case XLOG_ZHEAP_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZInsert(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeZDelete(ctx, buf);
+			break;
+
+		case XLOG_ZHEAP_LOCK:
+			/* we don't care about row level locks for now */
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_ZHEAP_ID record type: %u", info);
+			break;
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1068,3 +1119,150 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
 	header->t_infomask2 = xlhdr.t_infomask2;
 	header->t_hoff = xlhdr.t_hoff;
 }
+
+/*
+ * Parse XLOG_ZHEAP_INSERT (not ZMULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeZInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	Size		datalen;
+	char	   *tupledata;
+	Size		tuplelen;
+	XLogReaderState *r = buf->record;
+	xl_zheap_insert *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_insert *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_zheap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLZ_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLZ_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_ZINSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.ztp.relnode, &target_node, sizeof(RelFileNode));
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+	tuplelen = datalen - SizeOfZHeapHeader;
+
+	change->data.ztp.newtuple =
+		ReorderBufferGetZTupleBuf(ctx->reorder, tuplelen);
+
+	DecodeXLogZTuple(tupledata, datalen, change->data.ztp.newtuple);
+
+	change->data.ztp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_ZHEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeZDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_zheap_delete *xlrec;
+	ReorderBufferChange *change;
+	RelFileNode target_node;
+
+	xlrec = (xl_zheap_delete *) ((char *) XLogRecGetData(r) + SizeOfUndoHeader);
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_ZDELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.ztp.relnode, &target_node, sizeof(RelFileNode));
+
+	/* old primary key stored */
+	if (xlrec->flags & XLZ_DELETE_CONTAINS_OLD)
+	{
+		char	*tupledata;
+		Size	datalen;
+		Size	tuplelen;
+
+		tupledata = XLogRecGetBlockData(r, 0, &datalen);
+		tuplelen = datalen - SizeOfZHeapHeader;
+
+		change->data.ztp.oldtuple =
+			ReorderBufferGetZTupleBuf(ctx->reorder, tuplelen);
+
+		DecodeXLogZTuple(tupledata,
+						 datalen, change->data.ztp.oldtuple);
+	}
+
+	change->data.ztp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Read a ZHeapTuple as WAL logged by zheap_insert, zheap_update and
+ * zheap_delete (but not by zheap_multi_insert) into a tuplebuf.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ */
+static void
+DecodeXLogZTuple(char *data, Size len, ReorderBufferZTupleBuf *tuple)
+{
+	xl_zheap_header xlhdr;
+	int			datalen = len - SizeOfZHeapHeader;
+	ZHeapTupleHeader header;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofZHeapTupleHeader;
+	header = tuple->tuple.t_data;
+
+	/* not a disk based tuple */
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	/* we can only figure this out after reassembling the transactions */
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	/* data is not stored aligned, copy to aligned storage */
+	memcpy((char *) &xlhdr, data, SizeOfZHeapHeader);
+
+	memset(header, 0, SizeofZHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofZHeapTupleHeader,
+		   data + SizeOfZHeapHeader,
+		   datalen);
+
+	header->t_infomask = xlhdr.t_infomask;
+	header->t_infomask2 = xlhdr.t_infomask2;
+	header->t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 19451714da..525dc2b19d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -347,6 +347,25 @@ logicalrep_read_truncate(StringInfo in,
 	return relids;
 }
 
+/*
+ * Write zheap's INSERT to the output stream.
+ */
+void
+logicalrep_write_zinsert(StringInfo out, Relation rel, ZHeapTuple newtuple)
+{
+	pq_sendbyte(out, 'I');		/* action INSERT */
+
+	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+	/* use Oid as relation identifier */
+	pq_sendint32(out, RelationGetRelid(rel));
+
+	pq_sendbyte(out, 'N');		/* new tuple follows */
+	//logicalrep_write_tuple(out, rel, newtuple);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466bade2..d2ef994899 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -393,6 +393,20 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 				change->data.tp.oldtuple = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+		case REORDER_BUFFER_CHANGE_ZDELETE:
+			if (change->data.ztp.newtuple)
+			{
+				ReorderBufferReturnZTupleBuf(rb, change->data.ztp.newtuple);
+				change->data.ztp.newtuple = NULL;
+			}
+
+			if (change->data.ztp.oldtuple)
+			{
+				ReorderBufferReturnZTupleBuf(rb, change->data.ztp.oldtuple);
+				change->data.ztp.oldtuple = NULL;
+			}
+			break;
 		case REORDER_BUFFER_CHANGE_MESSAGE:
 			if (change->data.msg.prefix != NULL)
 				pfree(change->data.msg.prefix);
@@ -456,6 +470,37 @@ ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 	pfree(tuple);
 }
 
+/*
+ * Get a fresh ReorderBufferZTupleBuf fitting at least a tuple of size
+ * tuple_len (excluding header overhead).
+ */
+ReorderBufferZTupleBuf *
+ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len)
+{
+	ReorderBufferZTupleBuf *tuple;
+	Size		alloc_len;
+
+	alloc_len = tuple_len + SizeofZHeapTupleHeader;
+
+	tuple = (ReorderBufferZTupleBuf *)
+		MemoryContextAlloc(rb->tup_context,
+						   sizeof(ReorderBufferZTupleBuf) +
+						   alloc_len);
+	tuple->alloc_tuple_size = alloc_len;
+	tuple->tuple.t_data = ReorderBufferZTupleBufData(tuple);
+
+	return tuple;
+}
+
+/*
+ * Free an ReorderBufferZTupleBuf.
+ */
+void
+ReorderBufferReturnZTupleBuf(ReorderBuffer *rb, ReorderBufferZTupleBuf *tuple)
+{
+	pfree(tuple);
+}
+
 /*
  * Get an array for relids of truncated relations.
  *
@@ -1684,6 +1729,72 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						break;
 					}
 
+				case REORDER_BUFFER_CHANGE_ZINSERT:
+				case REORDER_BUFFER_CHANGE_ZDELETE:
+					{
+						Assert(snapshot_now);
+
+						reloid = RelidByRelfilenode(change->data.ztp.relnode.spcNode,
+													change->data.ztp.relnode.relNode);
+
+						if (reloid == InvalidOid &&
+							change->data.ztp.newtuple == NULL &&
+							change->data.ztp.oldtuple == NULL)
+							goto change_done;
+						else if (reloid == InvalidOid)
+							elog(ERROR, "could not map filenode \"%s\" to relation OID",
+								relpathperm(change->data.ztp.relnode,
+									MAIN_FORKNUM));
+
+						relation = RelationIdGetRelation(reloid);
+
+						if (relation == NULL)
+							elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+								 reloid,
+								 relpathperm(change->data.ztp.relnode,
+											 MAIN_FORKNUM));
+
+						if (!RelationIsLogicallyLogged(relation))
+							goto zchange_done;
+
+						/*
+						 * Ignore temporary heaps created during DDL unless the
+						 * plugin has asked for them.
+						 */
+						if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+							goto zchange_done;
+
+						/*
+						 * For now ignore sequence changes entirely. Most of the
+						 * time they don't log changes using records we
+						 * understand, so it doesn't make sense to handle the few
+						 * cases we do.
+						 */
+						if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+							goto zchange_done;
+
+						/* user-triggered change */
+						if (!IsToastRelation(relation))
+						{
+							rb->apply_change(rb, txn, relation, change);
+						}
+						else if (change->action == REORDER_BUFFER_CHANGE_ZINSERT)
+						{
+							/* toast table implementation for zheap is not done yet. */
+							elog(ERROR,"decoding for toast tables not supported in zheap");
+						}
+
+					zchange_done:
+
+						if (relation != NULL)
+						{
+							RelationClose(relation);
+							relation = NULL;
+						}
+					}
+
+					break;
+
 				case REORDER_BUFFER_CHANGE_MESSAGE:
 					rb->message(rb, txn, change->lsn, true,
 								change->data.msg.prefix,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 86e0951a70..303a3b8303 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -365,6 +365,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
 			break;
+		case REORDER_BUFFER_CHANGE_ZINSERT:
+			OutputPluginPrepareWrite(ctx, true);
+			logicalrep_write_zinsert(ctx->out, relation,
+									 &change->data.ztp.newtuple->tuple);
+			OutputPluginWrite(ctx, true);
+			break;
 		default:
 			Assert(false);
 	}
diff --git a/src/include/access/zheapam_xlog.h b/src/include/access/zheapam_xlog.h
index 004aea495e..a023b9a7bc 100644
--- a/src/include/access/zheapam_xlog.h
+++ b/src/include/access/zheapam_xlog.h
@@ -124,6 +124,12 @@ typedef struct xl_zheap_insert
 #define XLZ_DELETE_CONTAINS_TPD_SLOT			(1<<2)
 #define XLZ_DELETE_CONTAINS_SUBXACT				(1<<3)
 #define XLZ_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLZ_DELETE_CONTAINS_OLD_TUPLE			(1<<5)
+#define XLZ_DELETE_CONTAINS_OLD_KEY				(1<<6)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLZ_DELETE_CONTAINS_OLD						\
+	(XLZ_DELETE_CONTAINS_OLD_TUPLE | XLZ_DELETE_CONTAINS_OLD_KEY)
 
 /* This is what we need to know about delete */
 typedef struct xl_zheap_delete
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 8192f79ce3..587472ee06 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -106,4 +106,7 @@ extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, Oid typoid);
 extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
 
+extern void logicalrep_write_zinsert(StringInfo out, Relation rel,
+						ZHeapTuple newtuple);
+
 #endif							/* LOGICALREP_PROTO_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7787edf7b6..837aa70d75 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/zhtup.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -36,6 +37,25 @@ typedef struct ReorderBufferTupleBuf
 #define ReorderBufferTupleBufData(p) \
 	((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
 
+/* an individual zheap tuple, stored in one chunk of memory */
+typedef struct ReorderBufferZTupleBuf
+{
+	/* position in preallocated list */
+	slist_node	node;
+
+	/* tuple header, the interesting bit for users of logical decoding */
+	ZHeapTupleData tuple;
+
+	/* pre-allocated size of tuple buffer, different from tuple size */
+	Size		alloc_tuple_size;
+
+	/* actual tuple data follows */
+} ReorderBufferZTupleBuf;
+
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferZTupleBufData(p) \
+	((ZHeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferZTupleBuf)))
+
 /*
  * Types of the change passed to a 'change' callback.
  *
@@ -60,7 +80,12 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_ZINSERT,
+	REORDER_BUFFER_CHANGE_ZUPDATE,
+	REORDER_BUFFER_CHANGE_ZDELETE,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZINSERT,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ZCONFIRM
 };
 
 /*
@@ -100,6 +125,20 @@ typedef struct ReorderBufferChange
 			ReorderBufferTupleBuf *newtuple;
 		}			tp;
 
+		struct
+		{
+			/* relation that has been changed */
+			RelFileNode relnode;
+
+			/* no previously reassembled toast chunks are necessary anymore */
+			bool		clear_toast_afterwards;
+
+			/* valid for DELETE || UPDATE */
+			ReorderBufferZTupleBuf *oldtuple;
+			/* valid for INSERT || UPDATE */
+			ReorderBufferZTupleBuf *newtuple;
+		}			ztp;
+
 		/*
 		 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
 		 * set of relations to be truncated.
@@ -399,6 +438,8 @@ void		ReorderBufferFree(ReorderBuffer *);
 
 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
+ReorderBufferZTupleBuf *ReorderBufferGetZTupleBuf(ReorderBuffer *rb, Size tuple_len);
+void		ReorderBufferReturnZTupleBuf(ReorderBuffer *, ReorderBufferZTupleBuf *tuple);
 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
