On Fri, Mar 11, 2022 at 6:31 PM Thomas Munro <[email protected]> wrote:
> Thanks for your review of 0001! It gave me a few things to think
> about and some good improvements.
And just in case it's useful, here's what changed between v21 and v22..
diff --git a/src/backend/access/transam/xlogreader.c
b/src/backend/access/transam/xlogreader.c
index 86a7b4c5c8..0d0c556b7c 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -90,8 +90,8 @@ XLogReaderSetDecodeBuffer(XLogReaderState *state, void
*buffer, size_t size)
state->decode_buffer = buffer;
state->decode_buffer_size = size;
- state->decode_buffer_head = buffer;
state->decode_buffer_tail = buffer;
+ state->decode_buffer_head = buffer;
}
/*
@@ -271,7 +271,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/*
* See if we can release the last record that was returned by
- * XLogNextRecord(), to free up space.
+ * XLogNextRecord(), if any, to free up space.
*/
void
XLogReleasePreviousRecord(XLogReaderState *state)
@@ -283,16 +283,16 @@ XLogReleasePreviousRecord(XLogReaderState *state)
/*
* Remove it from the decoded record queue. It must be the oldest item
- * decoded, decode_queue_tail.
+ * decoded, decode_queue_head.
*/
record = state->record;
- Assert(record == state->decode_queue_tail);
+ Assert(record == state->decode_queue_head);
state->record = NULL;
- state->decode_queue_tail = record->next;
+ state->decode_queue_head = record->next;
- /* It might also be the newest item decoded, decode_queue_head. */
- if (state->decode_queue_head == record)
- state->decode_queue_head = NULL;
+ /* It might also be the newest item decoded, decode_queue_tail. */
+ if (state->decode_queue_tail == record)
+ state->decode_queue_tail = NULL;
/* Release the space. */
if (unlikely(record->oversized))
@@ -302,11 +302,11 @@ XLogReleasePreviousRecord(XLogReaderState *state)
}
else
{
- /* It must be the tail record in the decode buffer. */
- Assert(state->decode_buffer_tail == (char *) record);
+ /* It must be the head (oldest) record in the decode buffer. */
+ Assert(state->decode_buffer_head == (char *) record);
/*
- * We need to update tail to point to the next record that is
in the
+ * We need to update head to point to the next record that is
in the
* decode buffer, if any, being careful to skip oversized ones
* (they're not in the decode buffer).
*/
@@ -316,8 +316,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
if (record)
{
- /* Adjust tail to release space up to the next record.
*/
- state->decode_buffer_tail = (char *) record;
+ /* Adjust head to release space up to the next record.
*/
+ state->decode_buffer_head = (char *) record;
}
else
{
@@ -327,8 +327,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
* we'll keep overwriting the same piece of memory if
we're not
* doing any prefetching.
*/
- state->decode_buffer_tail = state->decode_buffer;
state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
}
}
}
@@ -351,7 +351,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
/* Release the last record returned by XLogNextRecord(). */
XLogReleasePreviousRecord(state);
- if (state->decode_queue_tail == NULL)
+ if (state->decode_queue_head == NULL)
{
*errormsg = NULL;
if (state->errormsg_deferred)
@@ -376,7 +376,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
* XLogRecXXX(xlogreader) macros, which work with the decoder rather
than
* the record for historical reasons.
*/
- state->record = state->decode_queue_tail;
+ state->record = state->decode_queue_head;
/*
* Update the pointers to the beginning and one-past-the-end of this
@@ -428,12 +428,12 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
if (!XLogReaderHasQueuedRecordOrError(state))
XLogReadAhead(state, false /* nonblocking */ );
- /* Consume the tail record or error. */
+ /* Consume the head record or error. */
decoded = XLogNextRecord(state, errormsg);
if (decoded)
{
/*
- * XLogReadRecord() returns a pointer to the record's header,
not the
+ * This function returns a pointer to the record's header, not
the
* actual decoded record. The caller will access the decoded
record
* through the XLogRecGetXXX() macros, which reach the decoded
* recorded as xlogreader->record.
@@ -451,6 +451,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
* decoded record wouldn't fit in the decode buffer and must eventually be
* freed explicitly.
*
+ * The caller is responsible for adjusting decode_buffer_tail with the real
+ * size after successfully decoding a record into this space. This way, if
+ * decoding fails, then there is nothing to undo unless the 'oversized' flag
+ * was set and pfree() must be called.
+ *
* Return NULL if there is no space in the decode buffer and allow_oversized
* is false, or if memory allocation fails for an oversized buffer.
*/
@@ -470,21 +475,23 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t
xl_tot_len, bool allow_oversi
state->decode_buffer_tail = state->decode_buffer;
state->free_decode_buffer = true;
}
- if (state->decode_buffer_head >= state->decode_buffer_tail)
+
+ /* Try to allocate space in the circular decode buffer. */
+ if (state->decode_buffer_tail >= state->decode_buffer_head)
{
- /* Empty, or head is to the right of tail. */
- if (state->decode_buffer_head + required_space <=
+ /* Empty, or tail is to the right of head. */
+ if (state->decode_buffer_tail + required_space <=
state->decode_buffer + state->decode_buffer_size)
{
- /* There is space between head and end. */
- decoded = (DecodedXLogRecord *)
state->decode_buffer_head;
+ /* There is space between tail and end. */
+ decoded = (DecodedXLogRecord *)
state->decode_buffer_tail;
decoded->oversized = false;
return decoded;
}
else if (state->decode_buffer + required_space <
- state->decode_buffer_tail)
+ state->decode_buffer_head)
{
- /* There is space between start and tail. */
+ /* There is space between start and head. */
decoded = (DecodedXLogRecord *) state->decode_buffer;
decoded->oversized = false;
return decoded;
@@ -492,12 +499,12 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t
xl_tot_len, bool allow_oversi
}
else
{
- /* Head is to the left of tail. */
- if (state->decode_buffer_head + required_space <
- state->decode_buffer_tail)
+ /* Tail is to the left of head. */
+ if (state->decode_buffer_tail + required_space <
+ state->decode_buffer_head)
{
- /* There is space between head and tail. */
- decoded = (DecodedXLogRecord *)
state->decode_buffer_head;
+ /* There is space between tail and heade. */
+ decoded = (DecodedXLogRecord *)
state->decode_buffer_tail;
decoded->oversized = false;
return decoded;
}
@@ -513,7 +520,7 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t
xl_tot_len, bool allow_oversi
return decoded;
}
- return decoded;
+ return NULL;
}
static XLogPageReadResult
@@ -748,7 +755,6 @@ restart:
if (pageHeader->xlp_info &
XLP_FIRST_IS_OVERWRITE_CONTRECORD)
{
state->overwrittenRecPtr = RecPtr;
- //ResetDecoder(state);
RecPtr = targetPagePtr;
goto restart;
}
@@ -865,18 +871,18 @@ restart:
/* The new decode buffer head must be MAXALIGNed. */
Assert(decoded->size == MAXALIGN(decoded->size));
if ((char *) decoded == state->decode_buffer)
- state->decode_buffer_head =
state->decode_buffer + decoded->size;
+ state->decode_buffer_tail =
state->decode_buffer + decoded->size;
else
- state->decode_buffer_head += decoded->size;
+ state->decode_buffer_tail += decoded->size;
}
/* Insert it into the queue of decoded records. */
- Assert(state->decode_queue_head != decoded);
- if (state->decode_queue_head)
- state->decode_queue_head->next = decoded;
- state->decode_queue_head = decoded;
- if (!state->decode_queue_tail)
- state->decode_queue_tail = decoded;
+ Assert(state->decode_queue_tail != decoded);
+ if (state->decode_queue_tail)
+ state->decode_queue_tail->next = decoded;
+ state->decode_queue_tail = decoded;
+ if (!state->decode_queue_head)
+ state->decode_queue_head = decoded;
return XLREAD_SUCCESS;
}
else
@@ -935,8 +941,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
result = XLogDecodeNextRecord(state, nonblocking);
if (result == XLREAD_SUCCESS)
{
- Assert(state->decode_queue_head != NULL);
- return state->decode_queue_head;
+ Assert(state->decode_queue_tail != NULL);
+ return state->decode_queue_tail;
}
return NULL;
@@ -946,8 +952,14 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
* Read a single xlog page including at least [pageptr, reqLen] of valid data
* via the page_read() callback.
*
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Returns XLREAD_FAIL if the required page cannot be read for some
+ * reason; errormsg_buf is set in that case (unless the error occurs in the
+ * page_read callback).
+ *
+ * Returns XLREAD_WOULDBLOCK if he requested data can't be read without
+ * waiting. This can be returned only if the installed page_read callback
+ * respects the state->nonblocking flag, and cannot read the requested data
+ * immediately.
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
@@ -1334,6 +1346,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr
RecPtr)
Assert(!XLogRecPtrIsInvalid(RecPtr));
+ /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
+ state->nonblocking = false;
+
/*
* skip over potential continuation data, keeping in mind that it may
span
* multiple pages
@@ -1544,19 +1559,19 @@ ResetDecoder(XLogReaderState *state)
DecodedXLogRecord *r;
/* Reset the decoded record queue, freeing any oversized records. */
- while ((r = state->decode_queue_tail))
+ while ((r = state->decode_queue_head) != NULL)
{
- state->decode_queue_tail = r->next;
+ state->decode_queue_head = r->next;
if (r->oversized)
pfree(r);
}
- state->decode_queue_head = NULL;
state->decode_queue_tail = NULL;
+ state->decode_queue_head = NULL;
state->record = NULL;
/* Reset the decode buffer to empty. */
- state->decode_buffer_head = state->decode_buffer;
state->decode_buffer_tail = state->decode_buffer;
+ state->decode_buffer_head = state->decode_buffer;
/* Clear error state. */
state->errormsg_buf[0] = '\0';
diff --git a/src/backend/access/transam/xlogutils.c
b/src/backend/access/transam/xlogutils.c
index 44d9313422..ea22577b41 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -373,7 +373,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
* going to initialize it. And vice versa.
*/
zeromode = (mode == RBM_ZERO_AND_LOCK || mode ==
RBM_ZERO_AND_CLEANUP_LOCK);
- willinit = (record->record->blocks[block_id].flags &
BKPBLOCK_WILL_INIT) != 0;
+ willinit = (XLogRecGetBlock(record, block_id)->flags &
BKPBLOCK_WILL_INIT) != 0;
if (willinit && !zeromode)
elog(PANIC, "block with WILL_INIT flag in WAL record must be
zeroed by redo routine");
if (!willinit && zeromode)
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index c129df44ac..a33ad034c0 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -403,14 +403,13 @@ XLogDumpRecordLen(XLogReaderState *record, uint32
*rec_len, uint32 *fpi_len)
* Calculate the amount of FPI data in the record.
*
* XXX: We peek into xlogreader's private decoded backup blocks for the
- * bimg_len indicating the length of FPI data. It doesn't seem worth it
to
- * add an accessor macro for this.
+ * bimg_len indicating the length of FPI data.
*/
*fpi_len = 0;
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
if (XLogRecHasBlockImage(record, block_id))
- *fpi_len += record->record->blocks[block_id].bimg_len;
+ *fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
}
/*
@@ -552,7 +551,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config,
XLogReaderState *record)
blk);
if (XLogRecHasBlockImage(record, block_id))
{
- uint8 bimg_info =
record->record->blocks[block_id].bimg_info;
+ uint8 bimg_info =
XLogRecGetBlock(record, block_id)->bimg_info;
if (BKPIMAGE_COMPRESSED(bimg_info))
{
@@ -569,11 +568,11 @@ XLogDumpDisplayRecord(XLogDumpConfig *config,
XLogReaderState *record)
"compression saved: %u,
method: %s",
XLogRecBlockImageApply(record, block_id) ?
"" : " for WAL verification",
-
record->record->blocks[block_id].hole_offset,
-
record->record->blocks[block_id].hole_length,
+ XLogRecGetBlock(record,
block_id)->hole_offset,
+ XLogRecGetBlock(record,
block_id)->hole_length,
BLCKSZ -
-
record->record->blocks[block_id].hole_length -
-
record->record->blocks[block_id].bimg_len,
+ XLogRecGetBlock(record,
block_id)->hole_length -
+ XLogRecGetBlock(record,
block_id)->bimg_len,
method);
}
else
@@ -581,8 +580,8 @@ XLogDumpDisplayRecord(XLogDumpConfig *config,
XLogReaderState *record)
printf(" (FPW%s); hole: offset: %u,
length: %u",
XLogRecBlockImageApply(record, block_id) ?
"" : " for WAL verification",
-
record->record->blocks[block_id].hole_offset,
-
record->record->blocks[block_id].hole_length);
+ XLogRecGetBlock(record,
block_id)->hole_offset,
+ XLogRecGetBlock(record,
block_id)->hole_length);
}
}
putchar('\n');
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 86a26a9231..8446050225 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -249,16 +249,16 @@ struct XLogReaderState
char *decode_buffer;
size_t decode_buffer_size;
bool free_decode_buffer; /* need to free? */
- char *decode_buffer_head; /* write head */
- char *decode_buffer_tail; /* read head */
+ char *decode_buffer_head; /* data is read from the head */
+ char *decode_buffer_tail; /* new data is written at the tail */
/*
* Queue of records that have been decoded. This is a linked list that
* usually consists of consecutive records in decode_buffer, but may
also
* contain oversized records allocated with palloc().
*/
- DecodedXLogRecord *decode_queue_head; /* newest decoded record */
- DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */
+ DecodedXLogRecord *decode_queue_head; /* oldest decoded record */
+ DecodedXLogRecord *decode_queue_tail; /* newest decoded record */
/*
* Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at
least
@@ -350,7 +350,7 @@ extern XLogRecPtr XLogFindNextRecord(XLogReaderState
*state, XLogRecPtr RecPtr);
#endif /* FRONTEND */
/* Return values from XLogPageReadCB. */
-typedef enum XLogPageReadResultResult
+typedef enum XLogPageReadResult
{
XLREAD_SUCCESS = 0, /* record is successfully read
*/
XLREAD_FAIL = -1, /* failed during reading a
record */