On Wed, 2022-01-05 at 20:19 +0000, Simon Riggs wrote: > I spoke with Jeff in detail about this patch in NYC Dec 21, and I now > think it is worth pursuing. It seems much more likely that this would > be acceptable than fully extensible rmgr.
Thank you. I had some conversations with others who felt this approach is wasteful, which it is. But if this patch still has potential, I'm happy to pursue it along with the extensible rmgr approach. > So I'm signing up as a reviewer and we'll see if this is good to go. Great, I attached a rebased version. Regards, Jeff Davis
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd862..ed6dff179be 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -18,7 +18,7 @@ OBJS = \ gistdesc.o \ hashdesc.o \ heapdesc.o \ - logicalmsgdesc.o \ + logicaldesc.o \ mxactdesc.o \ nbtdesc.o \ relmapdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c similarity index 59% rename from src/backend/access/rmgrdesc/logicalmsgdesc.c rename to src/backend/access/rmgrdesc/logicaldesc.c index 099e11a84e7..c2b0434a606 100644 --- a/src/backend/access/rmgrdesc/logicalmsgdesc.c +++ b/src/backend/access/rmgrdesc/logicaldesc.c @@ -1,22 +1,22 @@ /*------------------------------------------------------------------------- * - * logicalmsgdesc.c - * rmgr descriptor routines for replication/logical/message.c + * logicaldesc.c + * rmgr descriptor routines for replication/logical/logical_xlog.c * * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group * * * IDENTIFICATION - * src/backend/access/rmgrdesc/logicalmsgdesc.c + * src/backend/access/rmgrdesc/logicaldesc.c * *------------------------------------------------------------------------- */ #include "postgres.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" void -logicalmsg_desc(StringInfo buf, XLogReaderState *record) +logical_desc(StringInfo buf, XLogReaderState *record) { char *rec = XLogRecGetData(record); uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; @@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record) sep = " "; } } + else if (info == XLOG_LOGICAL_INSERT) + { + + } + else if (info == XLOG_LOGICAL_UPDATE) + { + + } + else if (info == XLOG_LOGICAL_DELETE) + { + + } + else if (info == XLOG_LOGICAL_TRUNCATE) + { + + } } const char * -logicalmsg_identify(uint8 info) +logical_identify(uint8 info) { - if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE) - return "MESSAGE"; + switch (info) + { + case XLOG_LOGICAL_MESSAGE: + return "MESSAGE"; + case XLOG_LOGICAL_INSERT: + return "INSERT"; + case XLOG_LOGICAL_UPDATE: + return "UPDATE"; + case XLOG_LOGICAL_DELETE: + return "DELETE"; + case XLOG_LOGICAL_TRUNCATE: + return "TRUNCATE"; + default: + return NULL; + } return NULL; } diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b520..f31f7187ac4 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,7 +24,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb719..9fa281a1dca 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -19,7 +19,7 @@ OBJS = \ launcher.o \ logical.o \ logicalfuncs.o \ - message.o \ + logical_xlog.o \ origin.o \ proto.o \ relation.o \ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1d22208c1ad..1abccd7190b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -37,7 +37,7 @@ #include "catalog/pg_control.h" #include "replication/decode.h" #include "replication/logical.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "replication/origin.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" @@ -56,16 +56,19 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* individual record(group)'s handlers */ -static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf); +static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid, bool two_phase); @@ -154,8 +157,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeHeapOp(ctx, &buf); break; - case RM_LOGICALMSG_ID: - DecodeLogicalMsgOp(ctx, &buf); + case RM_LOGICAL_ID: + DecodeLogicalOp(ctx, &buf); break; /* @@ -518,7 +521,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { case XLOG_HEAP_INSERT: if (SnapBuildProcessChange(builder, xid, buf->origptr)) - DecodeInsert(ctx, buf); + DecodeHeapInsert(ctx, buf); break; /* @@ -616,35 +619,22 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) return filter_by_origin_cb_wrapper(ctx, origin_id); } -/* - * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). - */ static void -DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; + SnapBuild *builder = ctx->snapshot_builder; TransactionId xid = XLogRecGetXid(r); - uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); Snapshot snapshot; xl_logical_message *message; - if (info != XLOG_LOGICAL_MESSAGE) - elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); - - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + message = (xl_logical_message *) XLogRecGetData(r); - /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding messages. - */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) return; - message = (xl_logical_message *) XLogRecGetData(r); - if (message->dbId != ctx->slot->data.database || FilterByOrigin(ctx, origin_id)) return; @@ -664,6 +654,115 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * prefix */ message->message_size, message->message + message->prefix_size); + +} + +/* + * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs. + * + * Deletes can contain the new tuple. + */ +static void +DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + char *tupledata; + Size tuplelen; + XLogReaderState *r = buf->record; + xl_logical_insert *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_logical_insert *) XLogRecGetData(r); + tupledata = XLogRecGetData(r) + SizeOfLogicalInsert; + + /* + * Ignore insert records without new tuples (this does happen when + * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). + */ + if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) + return; + + /* only interested in our database */ + if (xlrec->node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_INSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); + + tuplelen = xlrec->datalen - SizeOfHeapHeader; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogTuple(tupledata, xlrec->datalen, change->data.tp.newtuple); + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); +} + +/* + * Handle rmgr LOGICAL_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeLogicalOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + SnapBuild *builder = ctx->snapshot_builder; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + switch (info) + { + case XLOG_LOGICAL_MESSAGE: + DecodeLogicalMsg(ctx, buf); + break; + + case XLOG_LOGICAL_INSERT: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeLogicalInsert(ctx, buf); + break; + + case XLOG_LOGICAL_UPDATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeUpdate(ctx, buf); + break; + + case XLOG_LOGICAL_DELETE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeDelete(ctx, buf); + break; + + case XLOG_LOGICAL_TRUNCATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeTruncate(ctx, buf); + break; + + default: + elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info); + break; + } } /* @@ -900,7 +999,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * Deletes can contain the new tuple. */ static void -DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { Size datalen; char *tupledata; diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/logical_xlog.c similarity index 50% rename from src/backend/replication/logical/message.c rename to src/backend/replication/logical/logical_xlog.c index b02363f0bda..efec90b3619 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/logical_xlog.c @@ -1,14 +1,14 @@ /*------------------------------------------------------------------------- * - * message.c - * Generic logical messages. + * logical_xlog.c + * Logical xlog records. * * Copyright (c) 2013-2022, PostgreSQL Global Development Group * * IDENTIFICATION - * src/backend/replication/logical/message.c + * src/backend/replication/logical/logical_xlog.c * - * NOTES + * Logical Messages * * Generic logical messages allow XLOG logging of arbitrary binary blobs that * get passed to the logical decoding plugin. In normal XLOG processing they @@ -26,16 +26,27 @@ * plugins. The plugin authors must take extra care to use unique prefix, * good options seems to be for example to use the name of the extension. * + * Logical Insert/Update/Delete/Truncate + * + * These records are intended to be used by non-heap table access methods that + * wish to support logical decoding and replication. They are treated + * similarly to the analogous heap records, but are not tied to physical pages + * or other details of the heap. These records are not processed during redo, + * so do not contribute to durability or physical replication; use generic WAL + * records for that. Note that using both logical WAL records and generic WAL + * records is redundant compared with the heap. + * * --------------------------------------------------------------------------- */ #include "postgres.h" +#include "access/heapam_xlog.h" #include "access/xact.h" #include "miscadmin.h" #include "nodes/execnodes.h" #include "replication/logical.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "utils/memutils.h" /* @@ -70,19 +81,75 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, /* allow origin filtering */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); + return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE); } +XLogRecPtr +LogLogicalInsert(Relation relation, TupleTableSlot *slot) +{ + bool shouldFree; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + xl_logical_insert xlrec; + xl_heap_header xlhdr; + XLogRecPtr recptr; + + /* force xid to be allocated */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + tuple->t_tableOid = slot->tts_tableOid; + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + xlrec.node = relation->rd_node; + xlrec.datalen = tuple->t_len; + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); + xlrec.flags = 0; + xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert); + + xlhdr.t_infomask2 = tuple->t_data->t_infomask2; + xlhdr.t_infomask = tuple->t_data->t_infomask; + xlhdr.t_hoff = tuple->t_data->t_hoff; + + XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader); + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + XLogRegisterData((char *) tuple->t_data + SizeofHeapTupleHeader, + tuple->t_len - SizeofHeapTupleHeader); + + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT); + + if (shouldFree) + pfree(tuple); + + return recptr; +} + + /* * Redo is basically just noop for logical decoding messages. */ void -logicalmsg_redo(XLogReaderState *record) +logical_redo(XLogReaderState *record) { uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; - if (info != XLOG_LOGICAL_MESSAGE) - elog(PANIC, "logicalmsg_redo: unknown op code %u", info); + switch (info) + { + case XLOG_LOGICAL_MESSAGE: + case XLOG_LOGICAL_INSERT: + case XLOG_LOGICAL_UPDATE: + case XLOG_LOGICAL_DELETE: + case XLOG_LOGICAL_TRUNCATE: + break; + default: + elog(PANIC, "logical_redo: unknown op code %u", info); + } /* This is only interesting for logical decoding, see decode.c. */ } diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 4f633888b4f..edb06b86d2d 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -28,7 +28,7 @@ #include "nodes/makefuncs.h" #include "replication/decode.h" #include "replication/logical.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore index 3be00a8b61f..567655fa626 100644 --- a/src/bin/pg_waldump/.gitignore +++ b/src/bin/pg_waldump/.gitignore @@ -10,7 +10,7 @@ /gistdesc.c /hashdesc.c /heapdesc.c -/logicalmsgdesc.c +/logicaldesc.c /mxactdesc.c /nbtdesc.c /relmapdesc.c diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1c..c7db22e70f1 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -26,7 +26,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "replication/origin.h" #include "rmgrdesc.h" #include "storage/standbydefs.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index ed751aaf039..5a55b18810e 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL) diff --git a/src/include/replication/message.h b/src/include/replication/logical_xlog.h similarity index 54% rename from src/include/replication/message.h rename to src/include/replication/logical_xlog.h index 7d7785292f1..bf20a740326 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/logical_xlog.h @@ -1,10 +1,10 @@ /*------------------------------------------------------------------------- - * message.h - * Exports from replication/logical/message.c + * logical_xlog.h + * Exports from replication/logical/logical_xlog.c * * Copyright (c) 2013-2022, PostgreSQL Global Development Group * - * src/include/replication/message.h + * src/include/replication/logical_xlog.h *------------------------------------------------------------------------- */ #ifndef PG_LOGICAL_MESSAGE_H @@ -13,6 +13,7 @@ #include "access/xlog.h" #include "access/xlogdefs.h" #include "access/xlogreader.h" +#include "storage/off.h" /* * Generic logical decoding message wal record. @@ -29,13 +30,34 @@ typedef struct xl_logical_message #define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) +/* This is what we need to know about insert */ +typedef struct xl_logical_insert +{ + RelFileNode node; + OffsetNumber offnum; /* inserted tuple's offset */ + Size datalen; + uint8 flags; + + /* xl_heap_header & TUPLE DATA in backup block 0 */ +} xl_logical_insert; + +#define SizeOfLogicalInsert (offsetof(xl_logical_insert, flags) + sizeof(uint8)) + +struct TupleTableSlot; + extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional); +extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot); /* RMGR API*/ #define XLOG_LOGICAL_MESSAGE 0x00 -void logicalmsg_redo(XLogReaderState *record); -void logicalmsg_desc(StringInfo buf, XLogReaderState *record); -const char *logicalmsg_identify(uint8 info); +#define XLOG_LOGICAL_INSERT 0x10 +#define XLOG_LOGICAL_UPDATE 0x20 +#define XLOG_LOGICAL_DELETE 0x30 +#define XLOG_LOGICAL_TRUNCATE 0x40 + +void logical_redo(XLogReaderState *record); +void logical_desc(StringInfo buf, XLogReaderState *record); +const char *logical_identify(uint8 info); #endif /* PG_LOGICAL_MESSAGE_H */