Dear Amit,

Thank you for reviewing! PSA new version patch set.

> Few comments:
> 1. Why is the FPI record (XLOG_FPI_FOR_HINT) not considered a record
> to be ignored? This can be generated during reading system tables.

Oh, I just missed. Written in comments atop the function, but not added here.
Added to white-list.

> 2.
> +binary_upgrade_validate_wal_record_types_after_lsn(PG_FUNCTION_ARGS)
> {
> ...
> + if (initial_record)
> + {
> + /* Initial record must be XLOG_CHECKPOINT_SHUTDOWN */
> + if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID,
> +   XLOG_CHECKPOINT_SHUTDOWN))
> + result = false;
> ...
> + if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID,
> XLOG_CHECKPOINT_SHUTDOWN) &&
> + !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID,
> XLOG_CHECKPOINT_ONLINE) &&
> + !CHECK_WAL_RECORD(rmid, info, RM_STANDBY_ID,
> XLOG_RUNNING_XACTS) &&
> + !CHECK_WAL_RECORD(rmid, info, RM_HEAP2_ID, XLOG_HEAP2_PRUNE))
> + result = false;
> ...
> }
> 
> Isn't it better to immediately return false if any unexpected WAL is
> found? This will avoid reading unnecessary WAL

IIUC we can exit the loop of the result == false, so we do not have to read
unnecessary WALs. See the condition below. I used the approach because
private_data and xlogreader should be pfree()'d as cleanup.

```
        /* Loop until all WALs are read, or unexpected record is found */
        while (result && ReadNextXLogRecord(xlogreader))
        {
```

> 3.
> +Datum
> +binary_upgrade_validate_wal_record_types_after_lsn(PG_FUNCTION_ARGS)
> +{
> ...
> +
> + CHECK_IS_BINARY_UPGRADE;
> +
> + /* Quick exit if the given lsn is larger than current one */
> + if (start_lsn >= curr_lsn)
> + PG_RETURN_BOOL(true);
> 
> Why do you return true here? My understanding was if the first record
> is not a shutdown checkpoint record then it should fail, if that is
> not true then I think we need to explain the same in comments.

I wondered what should be because it is unexpected input for us (note that this 
unction could be used only for upgrade purpose). But yes, initially read WAL 
must
be XLOG_SHUTDOWN_CHECKPOINT,  so changed as you said.

Also, I did a self-reviewing again and reworded comments.

BTW, the 0002 ports some functions from pg_walinspect, it may be not elegant.
Coupling degree between core/extensions should be also lower. So I made another
patch which does not port anything and implements similar functionalities 
instead.
I called the patch 0003, but can be applied atop 0001 (not 0002). To make cfbot
happy, attached as txt file.
Could you please tell me which do you like 0002 or 0003?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

From d3fa36f3bc7f8ef4c0c541742ac8ad6d9eee5f09 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Thu, 14 Sep 2023 06:01:40 +0000
Subject: [PATCH v38] Another one: Reads all WAL records ahead
 confirmed_flush_lsn

---
 doc/src/sgml/ref/pgupgrade.sgml               |   5 +-
 src/backend/utils/adt/pg_upgrade_support.c    | 130 ++++++++++++++++++
 src/bin/pg_upgrade/check.c                    |   8 +-
 src/bin/pg_upgrade/controldata.c              |  39 ------
 src/bin/pg_upgrade/info.c                     |   6 +-
 src/bin/pg_upgrade/pg_upgrade.h               |   1 -
 .../t/003_logical_replication_slots.pl        |  20 +++
 src/include/catalog/pg_proc.dat               |   6 +
 8 files changed, 164 insertions(+), 51 deletions(-)

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 4e2281bae4..2588d6d7b8 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -418,10 +418,7 @@ make prefix=/usr/local/pgsql.new install
      </listitem>
      <listitem>
       <para>
-       <link 
linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>confirmed_flush_lsn</structfield>
-       of all slots on the old cluster must be the same as the latest
-       checkpoint location. This ensures that all the data has been replicated
-       before the upgrade.
+       Old cluster has replicated all the changes replicated to subscribers.
       </para>
      </listitem>
      <listitem>
diff --git a/src/backend/utils/adt/pg_upgrade_support.c 
b/src/backend/utils/adt/pg_upgrade_support.c
index 0186636d9f..340dc180be 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,14 +11,22 @@
 
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/heap.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
+#include "storage/standbydefs.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/pg_lsn.h"
 
 
 #define CHECK_IS_BINARY_UPGRADE                                                
                        \
@@ -29,6 +37,9 @@ do {                                                          
                                                        \
                                 errmsg("function can only be called when 
server is in binary upgrade mode"))); \
 } while (0)
 
+#define CHECK_WAL_RECORD(rmgrid, info, expected_rmgrid, expected_info) \
+       (rmgrid == expected_rmgrid && info == expected_info)
+
 Datum
 binary_upgrade_set_next_pg_tablespace_oid(PG_FUNCTION_ARGS)
 {
@@ -261,3 +272,122 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
 
        PG_RETURN_VOID();
 }
+
+/*
+ * Return true if we didn't find any unexpected WAL record, false otherwise.
+ *
+ * This function is used to verify that there are no WAL records (except some
+ * types) after confirmed_flush_lsn of logical slots, which means all the
+ * changes were replicated to the subscriber. There is a possibility that some
+ * WALs are inserted after logical waslenders exit, so such types would be
+ * ignored.
+ *
+ * XLOG_CHECKPOINT_SHUTDOWN is ignored because it would be inserted after the
+ * waslender exits. Moreover, the following types of records would be during
+ * the pg_upgrade --check, so they are ignored too.
+ *
+ *             - XLOG_CHECKPOINT_ONLINE
+ *             - XLOG_RUNNING_XACTS
+ *             - XLOG_FPI_FOR_HINT
+ *             - XLOG_HEAP2_PRUNE
+ */
+Datum
+binary_upgrade_validate_wal_record_types_after_lsn(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr        start_lsn = PG_GETARG_LSN(0);
+       XLogReaderState *xlogreader;
+       bool                    initial_record = true;
+       bool                    result = true;
+       ReadLocalXLogPageNoWaitPrivate *private_data;
+
+       CHECK_IS_BINARY_UPGRADE;
+
+       /* Quick exit if the given lsn is larger than current one */
+       if (start_lsn >= GetFlushRecPtr(NULL))
+               PG_RETURN_BOOL(false);
+
+       private_data = (ReadLocalXLogPageNoWaitPrivate *)
+               palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
+
+       xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+                                                                       
XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
+                                                                               
           .segment_open = &wal_segment_open,
+                                                                               
           .segment_close = &wal_segment_close),
+                                                                       
private_data);
+
+       if (xlogreader == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory"),
+                                errdetail("Failed while allocating a WAL 
reading processor.")));
+
+       XLogBeginRead(xlogreader, start_lsn);
+
+       /* Loop until all WALs are read, or unexpected record is found */
+       while (result)
+       {
+               RmgrIds            rmid;
+               uint8              info;
+               char       *errormsg;
+               XLogRecord *record;
+
+               CHECK_FOR_INTERRUPTS();
+
+               record = XLogReadRecord(xlogreader, &errormsg);
+
+               if (record == NULL)
+               {
+                       ReadLocalXLogPageNoWaitPrivate *check_data;
+
+                       /* return NULL, if end of WAL is reached */
+                       check_data = (ReadLocalXLogPageNoWaitPrivate *)
+                               xlogreader->private_data;
+
+                       if (check_data->end_of_wal)
+                               break;
+
+                       if (errormsg)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                               errmsg("could not read WAL at 
%X/%X: %s",
+                                                               
LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+                       else
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                               errmsg("could not read WAL at 
%X/%X",
+                                                               
LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
+               }
+
+               /* Check the type of WAL */
+               rmid = XLogRecGetRmid(xlogreader);
+               info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+               if (initial_record)
+               {
+                       /* Initial record must be XLOG_CHECKPOINT_SHUTDOWN */
+                       if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID,
+                                                                 
XLOG_CHECKPOINT_SHUTDOWN))
+                               result = false;
+
+                       initial_record = false;
+
+                       continue;
+               }
+
+               /*
+                * XXX: There is a possibility that following records may be
+                * generated during the upgrade.
+                */
+               if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_CHECKPOINT_SHUTDOWN) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_CHECKPOINT_ONLINE) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_FPI_FOR_HINT) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_STANDBY_ID, 
XLOG_RUNNING_XACTS) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_HEAP2_ID, 
XLOG_HEAP2_PRUNE))
+                               result = false;
+       }
+
+       pfree(xlogreader->private_data);
+       XLogReaderFree(xlogreader);
+
+       PG_RETURN_BOOL(result);
+}
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index b1424fdf9c..df1ce67fc0 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1480,8 +1480,8 @@ check_new_cluster_logical_replication_slots(void)
  * Following points are checked:
  *
  *     - All logical replication slots are usable.
- *     - All logical replication slots consumed all WALs, except a
- *       CHECKPOINT_SHUTDOWN record.
+ *     - All logical replication slots consumed all WALs, except some 
acceptable
+ *       types.
  */
 static void
 check_old_cluster_for_valid_slots(bool live_check)
@@ -1521,8 +1521,8 @@ check_old_cluster_for_valid_slots(bool live_check)
                        }
 
                        /*
-                        * Do additional checks to ensure that confirmed_flush 
LSN of all
-                        * the slots is the same as the latest checkpoint 
location.
+                        * Do additional checks to ensure that all logical 
replication
+                        * slots have reached the current WAL position.
                         *
                         * Note: This can be satisfied only when the old 
cluster has been
                         * shut down, so we skip this for live checks.
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index f8f823e2be..4beb65ab22 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,45 +169,6 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                                }
                                got_cluster_state = true;
                        }
-
-                       else if ((p = strstr(bufin, "Latest checkpoint 
location:")) != NULL)
-                       {
-                               /*
-                                * Read the latest checkpoint location if the 
cluster is PG17
-                                * or later. This is used for upgrading logical 
replication
-                                * slots. Currently, we need it only for the 
old cluster but
-                                * for simplicity chose not to have additional 
checks.
-                                */
-                               if (GET_MAJOR_VERSION(cluster->major_version) 
>= 1700)
-                               {
-                                       char       *slash = NULL;
-                                       uint32          upper_lsn,
-                                                               lower_lsn;
-
-                                       p = strchr(p, ':');
-
-                                       if (p == NULL || strlen(p) <= 1)
-                                               pg_fatal("%d: controldata 
retrieval problem", __LINE__);
-
-                                       p++;            /* remove ':' char */
-
-                                       p = strpbrk(p, "01234567890ABCDEF");
-
-                                       if (p == NULL || strlen(p) <= 1)
-                                               pg_fatal("%d: controldata 
retrieval problem", __LINE__);
-
-                                       /*
-                                        * The upper and lower part of LSN must 
be read separately
-                                        * because it is stored as in %X/%X 
format.
-                                        */
-                                       upper_lsn = strtoul(p, &slash, 16);
-                                       lower_lsn = strtoul(++slash, NULL, 16);
-
-                                       /* And combine them */
-                                       cluster->controldata.chkpnt_latest =
-                                               ((uint64) upper_lsn << 32) | 
lower_lsn;
-                               }
-                       }
                }
 
                rc = pclose(output);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index f7b0deca87..5d25d1604e 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -647,12 +647,12 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * removed.
         */
        res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
-                                                       "(confirmed_flush_lsn = 
'%X/%X') as caught_up, conflicting as invalid "
+                                                       
"pg_catalog.binary_upgrade_validate_wal_record_types_after_lsn(confirmed_flush_lsn)
 as caught_up, "
+                                                       "conflicting as invalid 
"
                                                        "FROM 
pg_catalog.pg_replication_slots "
                                                        "WHERE slot_type = 
'logical' AND "
                                                        "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       
LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+                                                       "temporary IS FALSE;");
 
        num_slots = PQntuples(res);
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index f5ce6c3b4d..8a7f56831e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -246,7 +246,6 @@ typedef struct
        bool            date_is_int;
        bool            float8_pass_by_value;
        uint32          data_checksum_version;
-       XLogRecPtr      chkpnt_latest;
 } ControlData;
 
 /*
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl 
b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 01cb04ca12..b91fb2f88f 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -169,6 +169,26 @@ $subscriber->wait_for_subscription_sync($old_publisher, 
'sub');
 $subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
 $old_publisher->stop;
 
+# Dry run, successful check is expected. This is not live check, so shutdown
+# checkpoint record would be inserted. We want to test that
+# binary_upgrade_validate_wal_record_types_after_lsn() skips the WAL and then
+# upcoming pg_upgrade would succeed.
+command_ok(
+       [
+               'pg_upgrade', '--no-sync',
+               '-d',         $old_publisher->data_dir,
+               '-D',         $new_publisher->data_dir,
+               '-b',         $bindir,
+               '-B',         $bindir,
+               '-s',         $new_publisher->host,
+               '-p',         $old_publisher->port,
+               '-P',         $new_publisher->port,
+               $mode,        '--check'
+       ],
+       'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+       "pg_upgrade_output.d/ removed after pg_upgrade success");
+
 # Actual run, successful upgrade is expected
 command_ok(
        [
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f3d843222b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11370,6 +11370,12 @@
   proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_validate_wal_record_types_after_lsn',
+  prorows => '10', proretset => 't', provolatile => 's', prorettype => 'bool',
+  proargtypes => 'pg_lsn', proallargtypes => '{pg_lsn,bool}',
+  proargmodes => '{i,o}', proargnames => '{start_lsn,is_ok}',
+  prosrc => 'binary_upgrade_validate_wal_record_types_after_lsn' },
 
 # conversion functions
 { oid => '4302',
-- 
2.27.0

Attachment: v38-0001-pg_upgrade-Allow-to-replicate-logical-replicatio.patch
Description: v38-0001-pg_upgrade-Allow-to-replicate-logical-replicatio.patch

Attachment: v38-0002-Use-binary_upgrade_validate_wal_record_types_aft.patch
Description: v38-0002-Use-binary_upgrade_validate_wal_record_types_aft.patch

Reply via email to