On Thu, Oct 5, 2023 at 4:24 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > Today, I discussed this problem with Andres at PGConf NYC and he > > > suggested as following. To verify, if there is any pending unexpected > > > WAL after shutdown, we can have an API like > > > pg_logical_replication_slot_advance() which will simply process > > > records without actually sending anything downstream.
+1 for this approach. It looks neat. I think we also need to add TAP tests to generate decodable WAL records (RUNNING_XACT, CHECKPOINT_ONLINE, XLOG_FPI_FOR_HINT, XLOG_SWITCH, XLOG_PARAMETER_CHANGE, XLOG_HEAP2_PRUNE) during pg_upgrade as described here https://www.postgresql.org/message-id/TYAPR01MB58660273EACEFC5BF256B133F50DA%40TYAPR01MB5866.jpnprd01.prod.outlook.com. Basically, these were the exceptional WAL records that may be generated by pg_upgrade, so having tests for them is good. > > So I assume in each lower-level decode function (e.g. heap_decode() ) > > we will add the check that if we are checking the WAL for an upgrade > > then from that level we will return true or false based on whether the > > WAL is decodable or not. Is my understanding correct? > > > > Yes, this is one way to achive but I think this will require changing return > value of many APIs. Can we somehow just get this via LogicalDecodingContext > or some other way at the caller by allowing to set some variable at required > places? +1 for adding the required flags to the decoding context similar to fast_forward. Another way without adding any new variables is to pass the WAL record to LogicalDecodingProcessRecord, and upon return check the reorder buffer if there's any decoded change generated for the xid associated with the WAL record. If any decoded change related to the WAL record xid is found, then that's the end for the new function. Here's what I think [1], haven't tested it. [1] change_found = false; end_of_wal = false; ctx = CreateDecodingContext(); XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); while(!end_of_wal || !change_found) { XLogRecord *record; TransactionId xid; ReorderBufferTXN *txn; record = XLogReadRecord(ctx->reader, &errm); if (record) LogicalDecodingProcessRecord(ctx, ctx->reader); xid = XLogRecGetXid(record); txn = ReorderBufferTXNByXid(ctx->reorder, xid, false, NULL, InvalidXLogRecPtr, false); if (txn != NULL) { change_found = true; break; } CHECK_FOR_INTERRUPTS(); } -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com