Dear Bharath,
While checking more, I found some problems your PoC.
1. rm_is_record_decodable() returns true when WAL records are decodable.
Based on that, should is_valid be false when the function is true?
E.g., XLOG_HEAP_INSERT is accepted in the PoC.
2. XLOG_CHECKPOINT_SHUTDOWN and XLOG_RUNNING_XACTS should return false because
these records may be generated during the upgrade but they are acceptable.
3. A bit operations are done for extracting a WAL type, but the mask is
different based on the rmgr. E.g., XLOG uses XLR_INFO_MASK, but XACT uses
XLOG_XACT_OPMASK.
4. There is a possibility that "XLOG_HEAP_INSERT | XLOG_HEAP_INIT_PAGE" is
inserted,
but it is not handled.
Regarding the 2., maybe we should say "if the reorderbuffer is modified while
decoding,
rm_is_record_decodable must return false" or something. If so, the return value
of XLOG_END_OF_RECOVERY and XLOG_HEAP2_NEW_CID should be also changed.
I attached the fix patch for above. How do you think?
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
diff --git a/src/backend/access/transam/rmgr.c
b/src/backend/access/transam/rmgr.c
index 001bdf3535..850ba7829a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -117,6 +117,11 @@ RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr)
errdetail("Custom resource manager \"%s\"
already registered with the same ID.",
RmgrTable[rmid].rm_name)));
+ if (rmgr->rm_decode && rmgr->rm_is_record_decodable == NULL)
+ ereport(ERROR,
+ (errmsg("failed to register custom resource
manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Custom resource manager which has a
decode function must have is_reacode_decodable function too.")));
+
/* check for existing rmgr with the same name */
for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++)
{
diff --git a/src/backend/replication/logical/decode.c
b/src/backend/replication/logical/decode.c
index 60d26ae015..72a542a06b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -214,11 +214,10 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf)
bool
xlog_is_record_decodable(uint8 info)
{
- switch (info)
+ switch (info & ~XLR_INFO_MASK)
{
case XLOG_CHECKPOINT_SHUTDOWN:
case XLOG_END_OF_RECOVERY:
- return true;
case XLOG_CHECKPOINT_ONLINE:
case XLOG_PARAMETER_CHANGE:
case XLOG_NOOP:
@@ -401,7 +400,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf)
bool
xact_is_record_decodable(uint8 info)
{
- switch (info)
+ switch (info & XLOG_XACT_OPMASK)
{
case XLOG_XACT_COMMIT:
case XLOG_XACT_COMMIT_PREPARED:
@@ -471,10 +470,9 @@ standby_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
bool
standy_is_record_decodable(uint8 info)
{
- switch (info)
+ switch (info & ~XLR_INFO_MASK)
{
case XLOG_RUNNING_XACTS:
- return true;
case XLOG_STANDBY_LOCK:
case XLOG_INVALIDATIONS:
return false;
@@ -550,11 +548,11 @@ heap2_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
bool
heap2_is_record_decodable(uint8 info)
{
- switch (info)
+ switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP2_MULTI_INSERT:
- case XLOG_HEAP2_NEW_CID:
return true;
+ case XLOG_HEAP2_NEW_CID:
case XLOG_HEAP2_REWRITE:
case XLOG_HEAP2_FREEZE_PAGE:
case XLOG_HEAP2_PRUNE:
@@ -661,9 +659,10 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf)
bool
heap_is_record_decodable(uint8 info)
{
- switch (info)
+ switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
+ case XLOG_HEAP_INSERT | XLOG_HEAP_INIT_PAGE:
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
case XLOG_HEAP_DELETE:
@@ -782,7 +781,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
bool
logicalmsg_is_record_decodable(uint8 info)
{
- switch (info)
+ switch (info & ~XLR_INFO_MASK)
{
case XLOG_LOGICAL_MESSAGE:
return true;
diff --git a/src/backend/utils/adt/pg_upgrade_support.c
b/src/backend/utils/adt/pg_upgrade_support.c
index cfd3e448b1..52084dc644 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -320,19 +320,18 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
while (is_valid && ReadNextXLogRecord(xlogreader))
{
RmgrData rmgr;
- RmgrIds rmid;
- uint8 info;
-
- /* Check the type of WAL */
- rmid = XLogRecGetRmid(xlogreader);
- info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
if (initial_record)
{
/*
- * Initial record must be either
XLOG_CHECKPOINT_SHUTDOWN or
- * XLOG_SWITCH.
+ * Verify that the initial record is either
+ * XLOG_CHECKPOINT_SHUTDOWN or XLOG_SWITCH. Both of
record types
+ * are in the RM_XLOG_ID rmgr, so it's OK to use
XLR_INFO_MASK as
+ * mask.
*/
+ RmgrIds rmid = XLogRecGetRmid(xlogreader);
+ uint8 info = XLogRecGetInfo(xlogreader) &
~XLR_INFO_MASK;
+
is_valid = is_xlog_record_type(rmid, info, RM_XLOG_ID,
XLOG_CHECKPOINT_SHUTDOWN) ||
is_xlog_record_type(rmid, info, RM_XLOG_ID,
XLOG_SWITCH);
@@ -350,11 +349,14 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
if (rmgr.rm_decode != NULL)
{
if (rmgr.rm_is_record_decodable != NULL)
- is_valid = rmgr.rm_is_record_decodable(info);
+ {
+ /* If the record is decodable, the upgrade
should fail */
+ is_valid =
!rmgr.rm_is_record_decodable(XLogRecGetInfo(xlogreader));
+ }
else
ereport(ERROR,
errmsg("cannot check logical
decodability for resource manager \"%s\" with ID %d",
- rmgr.rm_name, rmid),
+ rmgr.rm_name,
XLogRecGetRmid(xlogreader)),
errdetail("Logical decodability
callback is not defined for the resource manager."));
}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 2c4f38d865..0248079566 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -40,8 +40,8 @@ tests += {
'tap': {
'env': {'with_icu': icu.found() ? 'yes' : 'no'},
'tests': [
- 't/001_basic.pl',
- 't/002_pg_upgrade.pl',
+# 't/001_basic.pl',
+# 't/002_pg_upgrade.pl',
't/003_upgrade_logical_replication_slots.pl',
],
'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index a471e77a7c..8d6c7ff0ca 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -21,6 +21,9 @@
* entries should be added at the end, to avoid changing IDs of existing
* entries.
*
+ * rm_is_record_decodable must return false when the reorderbuffer is modified
+ * while decoding, it returns true otherwise.
+ *
* Changes to this list possibly need an XLOG_PAGE_MAGIC bump.
*/
diff --git a/src/include/access/xlog_internal.h
b/src/include/access/xlog_internal.h
index 6e113ef53d..44e10c0a94 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -342,6 +342,9 @@ struct XLogRecordBuffer;
* rm_mask takes as input a page modified by the resource manager and masks
* out bits that shouldn't be flagged by wal_consistency_checking.
*
+ * If a resource manager implements rm_decode function, rm_is_record_decodable
+ * function must be also implemented.
+ *
* RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
* NULL, the corresponding RmgrTable entry is considered invalid.
*/
@@ -356,7 +359,7 @@ typedef struct RmgrData
void (*rm_mask) (char *pagedata, BlockNumber blkno);
void (*rm_decode) (struct LogicalDecodingContext *ctx,
struct
XLogRecordBuffer *buf);
- bool (*rm_is_record_decodable) (uint8 type);
+ bool (*rm_is_record_decodable) (uint8 info);
} RmgrData;
extern PGDLLIMPORT RmgrData RmgrTable[];
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 3885ce671d..d8a912296c 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -21,6 +21,12 @@ typedef struct XLogRecordBuffer
XLogReaderState *record;
} XLogRecordBuffer;
+/*
+ * Decode functions for resource managers.
+ *
+ * Note that if a rmgr has rm_decode function, it must have
+ * rm_is_record_decodable function as well.
+ */
extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -28,6 +34,7 @@ extern void xact_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf);
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf);
+/* is_record_decodable functions */
extern bool xlog_is_record_decodable(uint8 info);
extern bool xact_is_record_decodable(uint8 info);
extern bool standy_is_record_decodable(uint8 info);
diff --git a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
index a304ba54bb..7ac90633f4 100644
--- a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
+++ b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
@@ -10,7 +10,7 @@
* src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
*
* Custom WAL resource manager for records containing a simple textual
- * payload, no-op redo, and no decoding.
+ * payload, no-op redo and decode.
*
* -------------------------------------------------------------------------
*/
@@ -21,6 +21,7 @@
#include "access/xlog_internal.h"
#include "access/xloginsert.h"
#include "fmgr.h"
+#include "replication/decode.h"
#include "utils/pg_lsn.h"
#include "varatt.h"
@@ -51,12 +52,17 @@ typedef struct xl_testcustomrmgrs_message
void testcustomrmgrs_redo(XLogReaderState *record);
void testcustomrmgrs_desc(StringInfo buf, XLogReaderState *record);
const char *testcustomrmgrs_identify(uint8 info);
+void testcustomrmgrs_decode(struct LogicalDecodingContext *ctx,
+ struct
XLogRecordBuffer *buf);
+bool testcustomrmgrs_is_record_decodable(uint8 info);
static const RmgrData testcustomrmgrs_rmgr = {
.rm_name = TESTCUSTOMRMGRS_NAME,
.rm_redo = testcustomrmgrs_redo,
.rm_desc = testcustomrmgrs_desc,
- .rm_identify = testcustomrmgrs_identify
+ .rm_identify = testcustomrmgrs_identify,
+ .rm_decode = testcustomrmgrs_decode,
+ .rm_is_record_decodable = testcustomrmgrs_is_record_decodable
};
/*
@@ -111,6 +117,30 @@ testcustomrmgrs_identify(uint8 info)
return NULL;
}
+void
+testcustomrmgrs_decode(struct LogicalDecodingContext *ctx,
+ struct XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+ if (info != XLOG_TEST_CUSTOM_RMGRS_MESSAGE)
+ elog(PANIC, "testcustomrmgrs_redo: unknown op code %u", info);
+}
+
+bool
+testcustomrmgrs_is_record_decodable(uint8 info)
+{
+ switch (info & ~XLR_INFO_MASK)
+ {
+ case XLOG_TEST_CUSTOM_RMGRS_MESSAGE:
+ return true;
+ default:
+ elog(ERROR, "unexpected RM_TESTCUSTOMRMGRS_ID record
type: %u",
+ info);
+ }
+}
+
/*
* SQL function for writing a simple message into WAL with the help of custom
* WAL resource manager.