On Sat, Oct 14, 2023 at 10:45 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Here is a new patch.
>
> Previously I wrote:
> > Based on above idea, I made new version patch which some functionalities
> > were
> > exported from pg_resetwal. In this approach, pg_upgrade itself removed WALs
> > and
> > then create logical slots, then pg_resetwal would be called with new option
> > --no-switch, which avoid to switch a WAL segment file. The option is only
> > used
> > for the upgrading purpose so it is not written in doc and usage(). This
> > option
> > is not required if pg_resetwal -o does not discard WAL records. Please see
> > the
> > fork thread [1].
>
> But for now, these changes were reverted because changing pg_resetwal -o stuff
> may be a bit risky. This has been located more than ten years so that we
> should
> be more careful for modifying.
> Also, I cannot come up with problems if slots are created after the
> pg_resetwal.
> Background processes would not generate decodable changes (listed in [1]), and
> BGworkers by extensions could be ignored [2].
> Based on the discussion on forked thread [3] and if it is accepted, we will
> apply
> again.
>
Yeah, I think introducing additional complexity unless it is really
required sounds a bit scary to me as well. BTW, please find attached
some cosmetic changes.
One minor additional comment:
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
Why do we need to set wal_level as logical for subscribers?
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/decode.c
b/src/backend/replication/logical/decode.c
index 4144a43afd..cfa955a679 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -618,9 +618,9 @@ logicalmsg_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
return;
/*
- * We can also skip decoding when in 'fast_forward' mode. This check
must
- * be last because we don't want to set that processing_required flag
- * unnecessarily.
+ * We also skip decoding in 'fast_forward' mode. This check must be last
+ * because we don't want to set the processing_required flag unless
+ * we have a decodable message.
*/
if (ctx->fast_forward)
{
@@ -1307,8 +1307,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf,
return true;
/*
- * We can also skip decoding when in 'fast_forward' mode. In passing set
- * the 'processing_required' flag to indicate, were it not for this
mode,
+ * We also skip decoding in 'fast_forward' mode. In passing set the
+ * 'processing_required' flag to indicate, were it not for this mode,
* processing *would* have been required.
*/
if (ctx->fast_forward)
diff --git a/src/backend/replication/logical/logical.c
b/src/backend/replication/logical/logical.c
index 32869a75ab..e02cd0fa44 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1953,9 +1953,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
}
/*
- * Read to end of WAL starting from the decoding slot's restart_lsn. Return
- * true if any meaningful/decodable WAL records are encountered, otherwise
- * false.
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn.
+ * Return true if any meaningful/decodable WAL records are encountered,
+ * otherwise false.
*
* Although this function is currently used only during pg_upgrade, there are
* no reasons to restrict it, so IsBinaryUpgrade is not checked here.
diff --git a/src/backend/utils/adt/pg_upgrade_support.c
b/src/backend/utils/adt/pg_upgrade_support.c
index 2a831bc397..a3a8ade405 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -274,8 +274,8 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
* Returns true if there are no decodable WAL records after the
* confirmed_flush_lsn. Otherwise false.
*
- * This is a special purpose function to ensure the given slot can be upgraded
- * without data loss.
+ * This is a special purpose function to ensure that the given slot can be
+ * upgraded without data loss.
*/
Datum
binary_upgrade_slot_has_pending_wal(PG_FUNCTION_ARGS)
@@ -294,16 +294,10 @@ binary_upgrade_slot_has_pending_wal(PG_FUNCTION_ARGS)
slot_name = PG_GETARG_NAME(0);
- /*
- * Acquire the given slot. There should be no error because the caller
has
- * already checked the slot exists.
- */
+ /* Acquire the given slot. */
ReplicationSlotAcquire(NameStr(*slot_name), true);
- /*
- * It's caller's responsibility to check the health of the slot.
Upcoming
- * functions assume the restart_lsn points to a valid record.
- */
+ /* Slots must be valid as otherwise we won't be able to scan the WAL. */
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
end_of_wal = GetFlushRecPtr(NULL);
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 123f47a81f..8f3f5585a4 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1541,10 +1541,8 @@ check_new_cluster_logical_replication_slots(void)
/*
* check_old_cluster_for_valid_slots()
*
- * Verify that all the logical slots are usable and have consumed all the WAL
- * before shutdown. The check has already been done in
- * get_old_cluster_logical_slot_infos(), so this function reads the result and
- * reports to the user.
+ * Verify that all the logical slots are valid and have consumed all the WAL
+ * before shutdown.
*/
static void
check_old_cluster_for_valid_slots(bool live_check)
@@ -1607,7 +1605,7 @@ check_old_cluster_for_valid_slots(bool live_check)
fclose(script);
pg_log(PG_REPORT, "fatal");
- pg_fatal("Your installation contains logical replication slots
that cannot be upgraded.\n"
+ pg_fatal("Your installation contains invalid logical
replication slots.\n"
"These slots can't be copied, so this cluster
cannot be upgraded.\n"
"Consider removing invalid slots and/or
consuming the pending WAL if any,\n"
"and then restart the upgrade.\n"
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index c56769fe54..5494e69227 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -651,8 +651,8 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool
live_check)
/*
* Fetch the logical replication slot information. The check whether the
* slot is considered caught up is done by an upgrade function. This
- * regards the slot is caught up if any changes are not found while
- * decoding. See binary_upgrade_slot_has_pending_wal().
+ * regards the slot as caught up if we don't find any decodable changes.
+ * See binary_upgrade_slot_has_pending_wal().
*
* Note that we can't ensure whether the slot is caught up during
* live_check as the new WAL records could be generated.
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 7acdf31d02..3960af4036 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -190,7 +190,12 @@ main(int argc, char **argv)
check_ok();
/*
- * If the old cluster has logical slots, migrate them to a new cluster.
+ * Migrate the logical slots to the new cluster. Note that we need to
do
+ * this after resetting WAL because otherwise the required WAL would be
+ * removed and slots would become unusable. There is a possibility that
+ * background processes might generate some WAL before we could create
the
+ * slots in the new cluster but we can ignore that WAL as that won't be
+ * required downstream.
*/
if (count_old_cluster_logical_slots())
{
@@ -890,7 +895,6 @@ create_logical_replication_slots(void)
LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
PGconn *conn;
PQExpBuffer query;
- char log_file_name[MAXPGPATH];
/* Skip this database if there are no slots */
if (slot_arr->nslots == 0)
@@ -899,9 +903,6 @@ create_logical_replication_slots(void)
conn = connectToServer(&new_cluster, old_db->db_name);
query = createPQExpBuffer();
- snprintf(log_file_name, sizeof(log_file_name),
- DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
-
pg_log(PG_STATUS, "%s", old_db->db_name);
for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
diff --git a/src/include/replication/logical.h
b/src/include/replication/logical.h
index 355247a58b..f8258d7c28 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -110,11 +110,7 @@ typedef struct LogicalDecodingContext
/* Are we processing the end LSN of a transaction? */
bool end_xact;
- /*
- * Did the logical decoding context require processing WALs?
- *
- * This flag is used only when in 'fast_forward' mode.
- */
+ /* Do we need to process any change in 'fast_forward' mode? */
bool processing_required;
} LogicalDecodingContext;