> Thank you for reviewing! PSA new version patch set.

Sorry, wrong patch attached. PSA the correct ones.
There is a possibility that XLOG_PARAMETER_CHANGE may be generated, when GUC
parameters are changed just before doing the upgrade. Added to list.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Attachment: v38-0001-pg_upgrade-Allow-to-replicate-logical-replicatio.patch
Description: v38-0001-pg_upgrade-Allow-to-replicate-logical-replicatio.patch

Attachment: v38-0002-Use-binary_upgrade_validate_wal_record_types_aft.patch
Description: v38-0002-Use-binary_upgrade_validate_wal_record_types_aft.patch

From ff250bb4bd467aaf8b09a27ab9edc93a3d9bb9bc Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Thu, 14 Sep 2023 06:01:40 +0000
Subject: [PATCH v38] Another one: Reads all WAL records ahead
 confirmed_flush_lsn

---
 doc/src/sgml/ref/pgupgrade.sgml               |   5 +-
 src/backend/utils/adt/pg_upgrade_support.c    | 133 ++++++++++++++++++
 src/bin/pg_upgrade/check.c                    |   8 +-
 src/bin/pg_upgrade/controldata.c              |  39 -----
 src/bin/pg_upgrade/info.c                     |   6 +-
 src/bin/pg_upgrade/pg_upgrade.h               |   1 -
 .../t/003_logical_replication_slots.pl        |  20 +++
 src/include/catalog/pg_proc.dat               |   6 +
 8 files changed, 167 insertions(+), 51 deletions(-)

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 4e2281bae4..2588d6d7b8 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -418,10 +418,7 @@ make prefix=/usr/local/pgsql.new install
      </listitem>
      <listitem>
       <para>
-       <link 
linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>confirmed_flush_lsn</structfield>
-       of all slots on the old cluster must be the same as the latest
-       checkpoint location. This ensures that all the data has been replicated
-       before the upgrade.
+       Old cluster has replicated all the changes replicated to subscribers.
       </para>
      </listitem>
      <listitem>
diff --git a/src/backend/utils/adt/pg_upgrade_support.c 
b/src/backend/utils/adt/pg_upgrade_support.c
index 0186636d9f..5b58769b5e 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,14 +11,22 @@
 
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/heap.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
+#include "storage/standbydefs.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/pg_lsn.h"
 
 
 #define CHECK_IS_BINARY_UPGRADE                                                
                        \
@@ -29,6 +37,9 @@ do {                                                          
                                                        \
                                 errmsg("function can only be called when 
server is in binary upgrade mode"))); \
 } while (0)
 
+#define CHECK_WAL_RECORD(rmgrid, info, expected_rmgrid, expected_info) \
+       (rmgrid == expected_rmgrid && info == expected_info)
+
 Datum
 binary_upgrade_set_next_pg_tablespace_oid(PG_FUNCTION_ARGS)
 {
@@ -261,3 +272,125 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
 
        PG_RETURN_VOID();
 }
+
+/*
+ * Return true if we didn't find any unexpected WAL record, false otherwise.
+ *
+ * This function is used to verify that there are no WAL records (except some
+ * types) after confirmed_flush_lsn of logical slots, which means all the
+ * changes were replicated to the subscriber. There is a possibility that some
+ * WALs are inserted after logical waslenders exit, so such types would be
+ * ignored.
+ *
+ * XLOG_CHECKPOINT_SHUTDOWN is ignored because it would be inserted after the
+ * waslender exits. XLOG_PARAMETER_CHANGE is also ignored because it would be
+ * inserted when GUC parameters are changed just before doing the upgrade.
+ * Moreover, the following types of records would be during
+ * the pg_upgrade --check, so they are ignored too.
+ *
+ *             - XLOG_CHECKPOINT_ONLINE
+ *             - XLOG_RUNNING_XACTS
+ *             - XLOG_FPI_FOR_HINT
+ *             - XLOG_HEAP2_PRUNE
+ */
+Datum
+binary_upgrade_validate_wal_record_types_after_lsn(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr        start_lsn = PG_GETARG_LSN(0);
+       XLogReaderState *xlogreader;
+       bool                    initial_record = true;
+       bool                    result = true;
+       ReadLocalXLogPageNoWaitPrivate *private_data;
+
+       CHECK_IS_BINARY_UPGRADE;
+
+       /* Quick exit if the given lsn is larger than current one */
+       if (start_lsn >= GetFlushRecPtr(NULL))
+               PG_RETURN_BOOL(false);
+
+       private_data = (ReadLocalXLogPageNoWaitPrivate *)
+               palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
+
+       xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+                                                                       
XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
+                                                                               
           .segment_open = &wal_segment_open,
+                                                                               
           .segment_close = &wal_segment_close),
+                                                                       
private_data);
+
+       if (xlogreader == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory"),
+                                errdetail("Failed while allocating a WAL 
reading processor.")));
+
+       XLogBeginRead(xlogreader, start_lsn);
+
+       /* Loop until all WALs are read, or unexpected record is found */
+       while (result)
+       {
+               RmgrIds            rmid;
+               uint8              info;
+               char       *errormsg;
+               XLogRecord *record;
+
+               CHECK_FOR_INTERRUPTS();
+
+               record = XLogReadRecord(xlogreader, &errormsg);
+
+               if (record == NULL)
+               {
+                       ReadLocalXLogPageNoWaitPrivate *check_data;
+
+                       /* return NULL, if end of WAL is reached */
+                       check_data = (ReadLocalXLogPageNoWaitPrivate *)
+                               xlogreader->private_data;
+
+                       if (check_data->end_of_wal)
+                               break;
+
+                       if (errormsg)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                               errmsg("could not read WAL at 
%X/%X: %s",
+                                                               
LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+                       else
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                               errmsg("could not read WAL at 
%X/%X",
+                                                               
LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
+               }
+
+               /* Check the type of WAL */
+               rmid = XLogRecGetRmid(xlogreader);
+               info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+               if (initial_record)
+               {
+                       /* Initial record must be XLOG_CHECKPOINT_SHUTDOWN */
+                       if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID,
+                                                                 
XLOG_CHECKPOINT_SHUTDOWN))
+                               result = false;
+
+                       initial_record = false;
+
+                       continue;
+               }
+
+               /*
+                * XXX: There is a possibility that following records may be
+                * generated during the upgrade.
+                */
+               if (!CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_CHECKPOINT_SHUTDOWN) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_CHECKPOINT_ONLINE) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_PARAMETER_CHANGE) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_XLOG_ID, 
XLOG_FPI_FOR_HINT) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_STANDBY_ID, 
XLOG_RUNNING_XACTS) &&
+                       !CHECK_WAL_RECORD(rmid, info, RM_HEAP2_ID, 
XLOG_HEAP2_PRUNE))
+                               result = false;
+       }
+
+       pfree(xlogreader->private_data);
+       XLogReaderFree(xlogreader);
+
+       PG_RETURN_BOOL(result);
+}
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index b1424fdf9c..df1ce67fc0 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1480,8 +1480,8 @@ check_new_cluster_logical_replication_slots(void)
  * Following points are checked:
  *
  *     - All logical replication slots are usable.
- *     - All logical replication slots consumed all WALs, except a
- *       CHECKPOINT_SHUTDOWN record.
+ *     - All logical replication slots consumed all WALs, except some 
acceptable
+ *       types.
  */
 static void
 check_old_cluster_for_valid_slots(bool live_check)
@@ -1521,8 +1521,8 @@ check_old_cluster_for_valid_slots(bool live_check)
                        }
 
                        /*
-                        * Do additional checks to ensure that confirmed_flush 
LSN of all
-                        * the slots is the same as the latest checkpoint 
location.
+                        * Do additional checks to ensure that all logical 
replication
+                        * slots have reached the current WAL position.
                         *
                         * Note: This can be satisfied only when the old 
cluster has been
                         * shut down, so we skip this for live checks.
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index f8f823e2be..4beb65ab22 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,45 +169,6 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                                }
                                got_cluster_state = true;
                        }
-
-                       else if ((p = strstr(bufin, "Latest checkpoint 
location:")) != NULL)
-                       {
-                               /*
-                                * Read the latest checkpoint location if the 
cluster is PG17
-                                * or later. This is used for upgrading logical 
replication
-                                * slots. Currently, we need it only for the 
old cluster but
-                                * for simplicity chose not to have additional 
checks.
-                                */
-                               if (GET_MAJOR_VERSION(cluster->major_version) 
>= 1700)
-                               {
-                                       char       *slash = NULL;
-                                       uint32          upper_lsn,
-                                                               lower_lsn;
-
-                                       p = strchr(p, ':');
-
-                                       if (p == NULL || strlen(p) <= 1)
-                                               pg_fatal("%d: controldata 
retrieval problem", __LINE__);
-
-                                       p++;            /* remove ':' char */
-
-                                       p = strpbrk(p, "01234567890ABCDEF");
-
-                                       if (p == NULL || strlen(p) <= 1)
-                                               pg_fatal("%d: controldata 
retrieval problem", __LINE__);
-
-                                       /*
-                                        * The upper and lower part of LSN must 
be read separately
-                                        * because it is stored as in %X/%X 
format.
-                                        */
-                                       upper_lsn = strtoul(p, &slash, 16);
-                                       lower_lsn = strtoul(++slash, NULL, 16);
-
-                                       /* And combine them */
-                                       cluster->controldata.chkpnt_latest =
-                                               ((uint64) upper_lsn << 32) | 
lower_lsn;
-                               }
-                       }
                }
 
                rc = pclose(output);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index f7b0deca87..5d25d1604e 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -647,12 +647,12 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * removed.
         */
        res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
-                                                       "(confirmed_flush_lsn = 
'%X/%X') as caught_up, conflicting as invalid "
+                                                       
"pg_catalog.binary_upgrade_validate_wal_record_types_after_lsn(confirmed_flush_lsn)
 as caught_up, "
+                                                       "conflicting as invalid 
"
                                                        "FROM 
pg_catalog.pg_replication_slots "
                                                        "WHERE slot_type = 
'logical' AND "
                                                        "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       
LSN_FORMAT_ARGS(old_cluster.controldata.chkpnt_latest));
+                                                       "temporary IS FALSE;");
 
        num_slots = PQntuples(res);
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index f5ce6c3b4d..8a7f56831e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -246,7 +246,6 @@ typedef struct
        bool            date_is_int;
        bool            float8_pass_by_value;
        uint32          data_checksum_version;
-       XLogRecPtr      chkpnt_latest;
 } ControlData;
 
 /*
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl 
b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 01cb04ca12..b91fb2f88f 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -169,6 +169,26 @@ $subscriber->wait_for_subscription_sync($old_publisher, 
'sub');
 $subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
 $old_publisher->stop;
 
+# Dry run, successful check is expected. This is not live check, so shutdown
+# checkpoint record would be inserted. We want to test that
+# binary_upgrade_validate_wal_record_types_after_lsn() skips the WAL and then
+# upcoming pg_upgrade would succeed.
+command_ok(
+       [
+               'pg_upgrade', '--no-sync',
+               '-d',         $old_publisher->data_dir,
+               '-D',         $new_publisher->data_dir,
+               '-b',         $bindir,
+               '-B',         $bindir,
+               '-s',         $new_publisher->host,
+               '-p',         $old_publisher->port,
+               '-P',         $new_publisher->port,
+               $mode,        '--check'
+       ],
+       'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+       "pg_upgrade_output.d/ removed after pg_upgrade success");
+
 # Actual run, successful upgrade is expected
 command_ok(
        [
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f3d843222b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11370,6 +11370,12 @@
   proname => 'binary_upgrade_set_next_pg_tablespace_oid', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
+{ oid => '8046', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_validate_wal_record_types_after_lsn',
+  prorows => '10', proretset => 't', provolatile => 's', prorettype => 'bool',
+  proargtypes => 'pg_lsn', proallargtypes => '{pg_lsn,bool}',
+  proargmodes => '{i,o}', proargnames => '{start_lsn,is_ok}',
+  prosrc => 'binary_upgrade_validate_wal_record_types_after_lsn' },
 
 # conversion functions
 { oid => '4302',
-- 
2.27.0

Reply via email to