Dear Bharath,

While checking more, I found some problems your PoC.

1. rm_is_record_decodable() returns true when WAL records are decodable.
   Based on that, should is_valid be false when the function is true?
   E.g., XLOG_HEAP_INSERT is accepted in the PoC.
2. XLOG_CHECKPOINT_SHUTDOWN and XLOG_RUNNING_XACTS should return false because
   these records may be generated during the upgrade but they are acceptable.
3. A bit operations are done for extracting a WAL type, but the mask is
   different based on the rmgr. E.g., XLOG uses XLR_INFO_MASK, but XACT uses
   XLOG_XACT_OPMASK.
4. There is a possibility that "XLOG_HEAP_INSERT | XLOG_HEAP_INIT_PAGE" is 
inserted,
   but it is not handled.

Regarding the 2., maybe we should say "if the reorderbuffer is modified while 
decoding,
rm_is_record_decodable must return false" or something. If so, the return value
of XLOG_END_OF_RECOVERY and XLOG_HEAP2_NEW_CID should be also changed.

I attached the fix patch for above. How do you think?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

diff --git a/src/backend/access/transam/rmgr.c 
b/src/backend/access/transam/rmgr.c
index 001bdf3535..850ba7829a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -117,6 +117,11 @@ RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr)
                                 errdetail("Custom resource manager \"%s\" 
already registered with the same ID.",
                                                   RmgrTable[rmid].rm_name)));
 
+       if (rmgr->rm_decode && rmgr->rm_is_record_decodable == NULL)
+               ereport(ERROR,
+                               (errmsg("failed to register custom resource 
manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+                                errdetail("Custom resource manager which has a 
decode function must have is_reacode_decodable function too.")));
+
        /* check for existing rmgr with the same name */
        for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++)
        {
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 60d26ae015..72a542a06b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -214,11 +214,10 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 xlog_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_CHECKPOINT_SHUTDOWN:
                case XLOG_END_OF_RECOVERY:
-                       return true;
                case XLOG_CHECKPOINT_ONLINE:
                case XLOG_PARAMETER_CHANGE:
                case XLOG_NOOP:
@@ -401,7 +400,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 xact_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & XLOG_XACT_OPMASK)
        {
                case XLOG_XACT_COMMIT:
                case XLOG_XACT_COMMIT_PREPARED:
@@ -471,10 +470,9 @@ standby_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 bool
 standy_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_RUNNING_XACTS:
-                       return true;
                case XLOG_STANDBY_LOCK:
                case XLOG_INVALIDATIONS:
                        return false;
@@ -550,11 +548,11 @@ heap2_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 bool
 heap2_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & XLOG_HEAP_OPMASK)
        {
                case XLOG_HEAP2_MULTI_INSERT:
-               case XLOG_HEAP2_NEW_CID:
                        return true;
+               case XLOG_HEAP2_NEW_CID:
                case XLOG_HEAP2_REWRITE:
                case XLOG_HEAP2_FREEZE_PAGE:
                case XLOG_HEAP2_PRUNE:
@@ -661,9 +659,10 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 heap_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & XLOG_HEAP_OPMASK)
        {
                case XLOG_HEAP_INSERT:
+               case XLOG_HEAP_INSERT | XLOG_HEAP_INIT_PAGE:
                case XLOG_HEAP_HOT_UPDATE:
                case XLOG_HEAP_UPDATE:
                case XLOG_HEAP_DELETE:
@@ -782,7 +781,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 bool
 logicalmsg_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_LOGICAL_MESSAGE:
                        return true;
diff --git a/src/backend/utils/adt/pg_upgrade_support.c 
b/src/backend/utils/adt/pg_upgrade_support.c
index cfd3e448b1..52084dc644 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -320,19 +320,18 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
        while (is_valid && ReadNextXLogRecord(xlogreader))
        {
                RmgrData        rmgr;
-               RmgrIds         rmid;
-               uint8           info;
-
-               /* Check the type of WAL */
-               rmid = XLogRecGetRmid(xlogreader);
-               info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
 
                if (initial_record)
                {
                        /*
-                        * Initial record must be either 
XLOG_CHECKPOINT_SHUTDOWN or
-                        * XLOG_SWITCH.
+                        * Verify that the initial record is either
+                        * XLOG_CHECKPOINT_SHUTDOWN or XLOG_SWITCH. Both of 
record types
+                        * are in the RM_XLOG_ID rmgr, so it's OK to use 
XLR_INFO_MASK as
+                        * mask.
                         */
+                       RmgrIds         rmid = XLogRecGetRmid(xlogreader);
+                       uint8           info = XLogRecGetInfo(xlogreader) & 
~XLR_INFO_MASK;
+
                        is_valid = is_xlog_record_type(rmid, info, RM_XLOG_ID, 
XLOG_CHECKPOINT_SHUTDOWN) ||
                                is_xlog_record_type(rmid, info, RM_XLOG_ID, 
XLOG_SWITCH);
 
@@ -350,11 +349,14 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
                if (rmgr.rm_decode != NULL)
                {
                        if (rmgr.rm_is_record_decodable != NULL)
-                               is_valid = rmgr.rm_is_record_decodable(info);
+                       {
+                               /* If the record is decodable, the upgrade 
should fail */
+                               is_valid = 
!rmgr.rm_is_record_decodable(XLogRecGetInfo(xlogreader));
+                       }
                        else
                                ereport(ERROR,
                                                errmsg("cannot check logical 
decodability for resource manager \"%s\" with ID %d",
-                                                          rmgr.rm_name, rmid),
+                                                          rmgr.rm_name, 
XLogRecGetRmid(xlogreader)),
                                                errdetail("Logical decodability 
callback is not defined for the resource manager."));
                }
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 2c4f38d865..0248079566 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -40,8 +40,8 @@ tests += {
   'tap': {
     'env': {'with_icu': icu.found() ? 'yes' : 'no'},
     'tests': [
-      't/001_basic.pl',
-      't/002_pg_upgrade.pl',
+#      't/001_basic.pl',
+#      't/002_pg_upgrade.pl',
       't/003_upgrade_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index a471e77a7c..8d6c7ff0ca 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -21,6 +21,9 @@
  * entries should be added at the end, to avoid changing IDs of existing
  * entries.
  *
+ * rm_is_record_decodable must return false when the reorderbuffer is modified
+ * while decoding, it returns true otherwise.
+ *
  * Changes to this list possibly need an XLOG_PAGE_MAGIC bump.
  */
 
diff --git a/src/include/access/xlog_internal.h 
b/src/include/access/xlog_internal.h
index 6e113ef53d..44e10c0a94 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -342,6 +342,9 @@ struct XLogRecordBuffer;
  * rm_mask takes as input a page modified by the resource manager and masks
  * out bits that shouldn't be flagged by wal_consistency_checking.
  *
+ * If a resource manager implements rm_decode function, rm_is_record_decodable
+ * function must be also implemented.
+ *
  * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
  * NULL, the corresponding RmgrTable entry is considered invalid.
  */
@@ -356,7 +359,7 @@ typedef struct RmgrData
        void            (*rm_mask) (char *pagedata, BlockNumber blkno);
        void            (*rm_decode) (struct LogicalDecodingContext *ctx,
                                                          struct 
XLogRecordBuffer *buf);
-       bool            (*rm_is_record_decodable) (uint8 type);
+       bool            (*rm_is_record_decodable) (uint8 info);
 } RmgrData;
 
 extern PGDLLIMPORT RmgrData RmgrTable[];
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 3885ce671d..d8a912296c 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -21,6 +21,12 @@ typedef struct XLogRecordBuffer
        XLogReaderState *record;
 } XLogRecordBuffer;
 
+/*
+ * Decode functions for resource managers.
+ *
+ * Note that if a rmgr has rm_decode function, it must have
+ * rm_is_record_decodable function as well.
+ */
 extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -28,6 +34,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf);
 
+/* is_record_decodable functions */
 extern bool xlog_is_record_decodable(uint8 info);
 extern bool xact_is_record_decodable(uint8 info);
 extern bool standy_is_record_decodable(uint8 info);
diff --git a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c 
b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
index a304ba54bb..7ac90633f4 100644
--- a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
+++ b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
@@ -10,7 +10,7 @@
  *             src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
  *
  * Custom WAL resource manager for records containing a simple textual
- * payload, no-op redo, and no decoding.
+ * payload, no-op redo and decode.
  *
  * -------------------------------------------------------------------------
  */
@@ -21,6 +21,7 @@
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
 #include "fmgr.h"
+#include "replication/decode.h"
 #include "utils/pg_lsn.h"
 #include "varatt.h"
 
@@ -51,12 +52,17 @@ typedef struct xl_testcustomrmgrs_message
 void           testcustomrmgrs_redo(XLogReaderState *record);
 void           testcustomrmgrs_desc(StringInfo buf, XLogReaderState *record);
 const char *testcustomrmgrs_identify(uint8 info);
+void           testcustomrmgrs_decode(struct LogicalDecodingContext *ctx,
+                                                                  struct 
XLogRecordBuffer *buf);
+bool           testcustomrmgrs_is_record_decodable(uint8 info);
 
 static const RmgrData testcustomrmgrs_rmgr = {
        .rm_name = TESTCUSTOMRMGRS_NAME,
        .rm_redo = testcustomrmgrs_redo,
        .rm_desc = testcustomrmgrs_desc,
-       .rm_identify = testcustomrmgrs_identify
+       .rm_identify = testcustomrmgrs_identify,
+       .rm_decode = testcustomrmgrs_decode,
+       .rm_is_record_decodable = testcustomrmgrs_is_record_decodable
 };
 
 /*
@@ -111,6 +117,30 @@ testcustomrmgrs_identify(uint8 info)
        return NULL;
 }
 
+void
+testcustomrmgrs_decode(struct LogicalDecodingContext *ctx,
+                                          struct XLogRecordBuffer *buf)
+{
+       XLogReaderState *r = buf->record;
+       uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+       if (info != XLOG_TEST_CUSTOM_RMGRS_MESSAGE)
+               elog(PANIC, "testcustomrmgrs_redo: unknown op code %u", info);
+}
+
+bool
+testcustomrmgrs_is_record_decodable(uint8 info)
+{
+       switch (info & ~XLR_INFO_MASK)
+       {
+               case XLOG_TEST_CUSTOM_RMGRS_MESSAGE:
+                       return true;
+               default:
+                       elog(ERROR, "unexpected RM_TESTCUSTOMRMGRS_ID record 
type: %u",
+                                info);
+       }
+}
+
 /*
  * SQL function for writing a simple message into WAL with the help of custom
  * WAL resource manager.

Reply via email to