Attached is a WIP patch to add new WAL records to represent a logical
insert, update, or delete. These records do not do anything at REDO
time, they are only processed during logical decoding/replication.

These are intended to be used by a custom table AM, like my columnar
compression extension[0], which currently supports physical replication
but can't support logical decoding/replication because decoding is not
extensible. Using these new logical records would be redundant, making
inserts/updates/deletes less efficient, but at least logical decoding
would work (the lack of which is columnar's biggest weakness).

Alternatively, we could support extensible WAL with extensible
decoding. I also like this approach, but it takes more work for an AM
like columnar to get that right -- it needs to keep additional state in
the walsender to track and assemble the compressed columns stored
across many blocks. It also requires a lot of care, because mistakes
can get you into serious trouble.

This proposal, for new logical records without WAL extensibility,
provides a more shallow ramp to get a table AM working (including
logical replication/decoding) without the need to invest in the WAL
design. Later, of course I'd like the option for extensible WAL as well
(to be more efficient), but right now I'd prefer it just worked
(inefficiently).

The patch is still very rough, but I tried in simple insert cases in my
columnar[0] extension (which only supports insert, not update/delete).
I'm looking for some review on the approach and structure before I
polish and test it. Note that my main test case is columnar, which
doesn't support update/delete. Also note that the patch is against v14
(for now).

Regards,
        Jeff Davis

[0] https://github.com/citusdata/citus/tree/master/src/backend/columnar
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd862..ed6dff179be 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,7 +18,7 @@ OBJS = \
 	gistdesc.o \
 	hashdesc.o \
 	heapdesc.o \
-	logicalmsgdesc.o \
+	logicaldesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
 	relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c
similarity index 59%
rename from src/backend/access/rmgrdesc/logicalmsgdesc.c
rename to src/backend/access/rmgrdesc/logicaldesc.c
index d64ce2e7eff..079ab5a847e 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicaldesc.c
@@ -1,22 +1,22 @@
 /*-------------------------------------------------------------------------
  *
- * logicalmsgdesc.c
- *	  rmgr descriptor routines for replication/logical/message.c
+ * logicaldesc.c
+ *	  rmgr descriptor routines for replication/logical/logical_xlog.c
  *
  * Portions Copyright (c) 2015-2021, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *	  src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *	  src/backend/access/rmgrdesc/logicaldesc.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 
 void
-logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+logical_desc(StringInfo buf, XLogReaderState *record)
 {
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
@@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
 			sep = " ";
 		}
 	}
+	else if (info == XLOG_LOGICAL_INSERT)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_UPDATE)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_DELETE)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_TRUNCATE)
+	{
+
+	}
 }
 
 const char *
-logicalmsg_identify(uint8 info)
+logical_identify(uint8 info)
 {
-	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
-		return "MESSAGE";
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			return "MESSAGE";
+		case XLOG_LOGICAL_INSERT:
+			return "INSERT";
+		case XLOG_LOGICAL_UPDATE:
+			return "UPDATE";
+		case XLOG_LOGICAL_DELETE:
+			return "DELETE";
+		case XLOG_LOGICAL_TRUNCATE:
+			return "TRUNCATE";
+		default:
+			return NULL;
+	}
 
 	return NULL;
 }
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..f31f7187ac4 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,7 +24,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..9fa281a1dca 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -19,7 +19,7 @@ OBJS = \
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
-	message.o \
+	logical_xlog.o \
 	origin.o \
 	proto.o \
 	relation.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 453efc51e16..fb9be9c71ae 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -37,7 +37,7 @@
 #include "catalog/pg_control.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
@@ -56,16 +56,19 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* individual record(group)'s handlers */
-static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
+static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf);
+static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid,
 						 bool two_phase);
@@ -154,8 +157,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeHeapOp(ctx, &buf);
 			break;
 
-		case RM_LOGICALMSG_ID:
-			DecodeLogicalMsgOp(ctx, &buf);
+		case RM_LOGICAL_ID:
+			DecodeLogicalOp(ctx, &buf);
 			break;
 
 			/*
@@ -518,7 +521,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	{
 		case XLOG_HEAP_INSERT:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeInsert(ctx, buf);
+				DecodeHeapInsert(ctx, buf);
 			break;
 
 			/*
@@ -616,35 +619,22 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return filter_by_origin_cb_wrapper(ctx, origin_id);
 }
 
-/*
- * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
- */
 static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
-	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
+	SnapBuild  *builder = ctx->snapshot_builder;
 	TransactionId xid = XLogRecGetXid(r);
-	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
 	Snapshot	snapshot;
 	xl_logical_message *message;
 
-	if (info != XLOG_LOGICAL_MESSAGE)
-		elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
-
-	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+	message = (xl_logical_message *) XLogRecGetData(r);
 
-	/*
-	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding messages.
-	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-		ctx->fast_forward)
+	if (message->transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
 		return;
 
-	message = (xl_logical_message *) XLogRecGetData(r);
-
 	if (message->dbId != ctx->slot->data.database ||
 		FilterByOrigin(ctx, origin_id))
 		return;
@@ -664,6 +654,115 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 												 * prefix */
 							  message->message_size,
 							  message->message + message->prefix_size);
+
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	char	   *tupledata;
+	Size		tuplelen;
+	XLogReaderState *r = buf->record;
+	xl_logical_insert *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_insert *) XLogRecGetData(r);
+	tupledata = XLogRecGetData(r) + SizeOfLogicalInsert;
+
+	/*
+	 * Ignore insert records without new tuples (this does happen when
+	 * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+	 */
+	if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
+		return;
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	tuplelen = xlrec->datalen - SizeOfHeapHeader;
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+	DecodeXLogTuple(tupledata, xlrec->datalen, change->data.tp.newtuple);
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change,
+							 xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
+}
+
+/*
+ * Handle rmgr LOGICAL_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	SnapBuild  *builder = ctx->snapshot_builder;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding data changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			DecodeLogicalMsg(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalInsert(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_UPDATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeUpdate(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeDelete(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_TRUNCATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeTruncate(ctx, buf);
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info);
+			break;
+	}
 }
 
 /*
@@ -900,7 +999,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
  * Deletes can contain the new tuple.
  */
 static void
-DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	Size		datalen;
 	char	   *tupledata;
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/logical_xlog.c
similarity index 50%
rename from src/backend/replication/logical/message.c
rename to src/backend/replication/logical/logical_xlog.c
index 93bd372421a..36bbfc8f17e 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/logical_xlog.c
@@ -1,14 +1,14 @@
 /*-------------------------------------------------------------------------
  *
- * message.c
- *	  Generic logical messages.
+ * logical_xlog.c
+ *	  Logical xlog records.
  *
  * Copyright (c) 2013-2021, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *	  src/backend/replication/logical/message.c
+ *	  src/backend/replication/logical/logical_xlog.c
  *
- * NOTES
+ * Logical Messages
  *
  * Generic logical messages allow XLOG logging of arbitrary binary blobs that
  * get passed to the logical decoding plugin. In normal XLOG processing they
@@ -26,16 +26,27 @@
  * plugins. The plugin authors must take extra care to use unique prefix,
  * good options seems to be for example to use the name of the extension.
  *
+ * Logical Insert/Update/Delete/Truncate
+ *
+ * These records are intended to be used by non-heap table access methods that
+ * wish to support logical decoding and replication. They are treated
+ * similarly to the analogous heap records, but are not tied to physical pages
+ * or other details of the heap. These records are not processed during redo,
+ * so do not contribute to durability or physical replication; use generic WAL
+ * records for that. Note that using both logical WAL records and generic WAL
+ * records is redundant compared with the heap.
+ *
  * ---------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "nodes/execnodes.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "utils/memutils.h"
 
 /*
@@ -70,19 +81,75 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
 	/* allow origin filtering */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+	return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE);
 }
 
+XLogRecPtr
+LogLogicalInsert(Relation relation, TupleTableSlot *slot)
+{
+	bool		shouldFree;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	xl_logical_insert xlrec;
+	xl_heap_header xlhdr;
+	XLogRecPtr recptr;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	tuple->t_tableOid = slot->tts_tableOid;
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	xlrec.node = relation->rd_node;
+	xlrec.datalen = tuple->t_len;
+	xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
+	xlrec.flags = 0;
+	xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert);
+
+	xlhdr.t_infomask2 = tuple->t_data->t_infomask2;
+	xlhdr.t_infomask = tuple->t_data->t_infomask;
+	xlhdr.t_hoff = tuple->t_data->t_hoff;
+
+	XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+	XLogRegisterData((char *) tuple->t_data + SizeofHeapTupleHeader,
+					 tuple->t_len - SizeofHeapTupleHeader);
+
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT);
+
+	if (shouldFree)
+		pfree(tuple);
+
+	return recptr;
+}
+
+
 /*
  * Redo is basically just noop for logical decoding messages.
  */
 void
-logicalmsg_redo(XLogReaderState *record)
+logical_redo(XLogReaderState *record)
 {
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info != XLOG_LOGICAL_MESSAGE)
-		elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+		case XLOG_LOGICAL_INSERT:
+		case XLOG_LOGICAL_UPDATE:
+		case XLOG_LOGICAL_DELETE:
+		case XLOG_LOGICAL_TRUNCATE:
+			break;
+		default:
+			elog(PANIC, "logical_redo: unknown op code %u", info);
+	}
 
 	/* This is only interesting for logical decoding, see decode.c. */
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 1f38c5b33ea..b84e3971e5e 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -28,7 +28,7 @@
 #include "nodes/makefuncs.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "storage/fd.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index 3be00a8b61f..567655fa626 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,7 +10,7 @@
 /gistdesc.c
 /hashdesc.c
 /heapdesc.c
-/logicalmsgdesc.c
+/logicaldesc.c
 /mxactdesc.c
 /nbtdesc.c
 /relmapdesc.c
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..c7db22e70f1 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,7 +26,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f6..48cba42f561 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL)
diff --git a/src/include/replication/message.h b/src/include/replication/logical_xlog.h
similarity index 54%
rename from src/include/replication/message.h
rename to src/include/replication/logical_xlog.h
index d3fb324c816..72c572eaae5 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/logical_xlog.h
@@ -1,10 +1,10 @@
 /*-------------------------------------------------------------------------
- * message.h
- *	   Exports from replication/logical/message.c
+ * logical_xlog.h
+ *	   Exports from replication/logical/logical_xlog.c
  *
  * Copyright (c) 2013-2021, PostgreSQL Global Development Group
  *
- * src/include/replication/message.h
+ * src/include/replication/logical_xlog.h
  *-------------------------------------------------------------------------
  */
 #ifndef PG_LOGICAL_MESSAGE_H
@@ -13,6 +13,7 @@
 #include "access/xlog.h"
 #include "access/xlogdefs.h"
 #include "access/xlogreader.h"
+#include "storage/off.h"
 
 /*
  * Generic logical decoding message wal record.
@@ -29,13 +30,34 @@ typedef struct xl_logical_message
 
 #define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
 
+/* This is what we need to know about insert */
+typedef struct xl_logical_insert
+{
+	RelFileNode		node;
+	OffsetNumber	offnum;		/* inserted tuple's offset */
+	Size			datalen;
+	uint8			flags;
+
+	/* xl_heap_header & TUPLE DATA in backup block 0 */
+} xl_logical_insert;
+
+#define SizeOfLogicalInsert	(offsetof(xl_logical_insert, flags) + sizeof(uint8))
+
+struct TupleTableSlot;
+
 extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
 									size_t size, bool transactional);
+extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot);
 
 /* RMGR API*/
 #define XLOG_LOGICAL_MESSAGE	0x00
-void		logicalmsg_redo(XLogReaderState *record);
-void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
-const char *logicalmsg_identify(uint8 info);
+#define XLOG_LOGICAL_INSERT		0x10
+#define XLOG_LOGICAL_UPDATE		0x20
+#define XLOG_LOGICAL_DELETE		0x30
+#define XLOG_LOGICAL_TRUNCATE	0x40
+
+void		logical_redo(XLogReaderState *record);
+void		logical_desc(StringInfo buf, XLogReaderState *record);
+const char *logical_identify(uint8 info);
 
 #endif							/* PG_LOGICAL_MESSAGE_H */

Reply via email to