Hi Hackers, In order to make incremental backup (https://wiki.postgresql.org/wiki/Incremental_backup) efficient we need a way to track the LSN of a page in a way that we can retrieve it without reading the actual block. Below there is my proposal on how to achieve it.
LSN Map ------- The purpose of the LSN map is to quickly know if a page of a relation has been modified after a specified checkpoint. Implementation -------------- We create an additional fork which contains a raw stream of LSNs. To limit the space used, every entry represent the maximum LSN of a group of blocks of a fixed size. I chose arbitrarily the size of 2048 which is equivalent to 16MB of heap data, which means that we need 64k entry to track one terabyte of heap. Name ---- I've called this map LSN map, and I've named the corresponding fork file as "lm". WAL logging ----------- At the moment the map is not wal logged, but is updated during the wal reply. I'm not enough deep in WAL mechanics to see if the current approach is sane or if we should change it. Current limits -------------- The current implementation tracks only heap LSN. It currently does not track any kind of indexes, but this can be easily added later. The implementation of commands that rewrite the whole table can be improved: cluster uses shared memory buffers instead of writing the map directly on the disk, and moving a table to another tablespace simply drops the map instead of updating it correctly. Further ideas ------------- The current implementation updates an entry in the map every time the block get its LSN bumped, but we really only need to know which is the first checkpoint that contains expired data. So setting the entry to the last checkpoint LSN is probably enough, and will reduce the number of writes. To implement this we only need a backend local copy of the last checkpoint LSN, which is updated during each XLogInsert. Again, I'm not enough deep in replication mechanics to see if this approach could work on a standby using restartpoints instead of checkpoints. Please advice on the best way to implement it. Conclusions ------------ This code is incomplete, and the xlog reply part must be improved/fixed, but I think its a good start to have this feature. I will appreciate any review, advice or critic. Regards, Marco -- Marco Nenciarini - 2ndQuadrant Italy PostgreSQL Training, Services and Support marco.nenciar...@2ndquadrant.it | www.2ndQuadrant.it
From 89a943032f0a10fd093c126d15fbf81e5861dbe3 Mon Sep 17 00:00:00 2001 From: Marco Nenciarini <marco.nenciar...@2ndquadrant.it> Date: Mon, 3 Nov 2014 17:52:27 +0100 Subject: [PATCH] LSN Map This is a WIP. Only heap is supported. No indexes, no sequences. --- src/backend/access/heap/Makefile | 2 +- src/backend/access/heap/heapam.c | 239 ++++++++++++++++++++++-- src/backend/access/heap/hio.c | 11 +- src/backend/access/heap/lsnmap.c | 336 ++++++++++++++++++++++++++++++++++ src/backend/access/heap/pruneheap.c | 10 + src/backend/access/heap/rewriteheap.c | 37 +++- src/backend/catalog/storage.c | 8 + src/backend/commands/tablecmds.c | 5 +- src/backend/commands/vacuumlazy.c | 35 +++- src/backend/storage/smgr/smgr.c | 1 + src/common/relpath.c | 5 +- src/include/access/hio.h | 3 +- src/include/access/lsnmap.h | 28 +++ src/include/common/relpath.h | 5 +- src/include/storage/smgr.h | 1 + 15 files changed, 687 insertions(+), 39 deletions(-) create mode 100644 src/backend/access/heap/lsnmap.c create mode 100644 src/include/access/lsnmap.h diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile index b83d496..776ee7d 100644 *** a/src/backend/access/heap/Makefile --- b/src/backend/access/heap/Makefile *************** subdir = src/backend/access/heap *** 12,17 **** top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o include $(top_srcdir)/src/backend/common.mk --- 12,17 ---- top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o lsnmap.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 21e9d06..9486562 100644 *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 48,53 **** --- 48,54 ---- #include "access/tuptoaster.h" #include "access/valid.h" #include "access/visibilitymap.h" + #include "access/lsnmap.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" *************** heap_insert(Relation relation, HeapTuple *** 2067,2073 **** TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; /* --- 2068,2075 ---- TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; bool all_visible_cleared = false; /* *************** heap_insert(Relation relation, HeapTuple *** 2097,2103 **** */ buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); --- 2099,2106 ---- */ buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL, ! &lmbuffer, NULL); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); *************** heap_insert(Relation relation, HeapTuple *** 2192,2197 **** --- 2195,2205 ---- recptr = XLogInsert(RM_HEAP_ID, info); PageSetLSN(page, recptr); + + /* + * Update the LSN map + */ + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** heap_insert(Relation relation, HeapTuple *** 2199,2204 **** --- 2207,2214 ---- UnlockReleaseBuffer(buffer); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); /* * If tuple is cachable, mark it for invalidation from the caches in case *************** heap_multi_insert(Relation relation, Hea *** 2346,2352 **** while (ndone < ntuples) { Buffer buffer; ! Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; int nthispage; --- 2356,2363 ---- while (ndone < ntuples) { Buffer buffer; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; bool all_visible_cleared = false; int nthispage; *************** heap_multi_insert(Relation relation, Hea *** 2358,2364 **** */ buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL); page = BufferGetPage(buffer); /* NO EREPORT(ERROR) from here till changes are logged */ --- 2369,2376 ---- */ buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL, ! &lmbuffer, NULL); page = BufferGetPage(buffer); /* NO EREPORT(ERROR) from here till changes are logged */ *************** heap_multi_insert(Relation relation, Hea *** 2502,2507 **** --- 2514,2521 ---- recptr = XLogInsert(RM_HEAP2_ID, info); PageSetLSN(page, recptr); + + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** heap_multi_insert(Relation relation, Hea *** 2509,2514 **** --- 2523,2530 ---- UnlockReleaseBuffer(buffer); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); ndone += nthispage; } *************** heap_delete(Relation relation, ItemPoint *** 2629,2635 **** Page page; BlockNumber block; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer; TransactionId new_xmax; uint16 new_infomask, new_infomask2; --- 2645,2652 ---- Page page; BlockNumber block; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; TransactionId new_xmax; uint16 new_infomask, new_infomask2; *************** heap_delete(Relation relation, ItemPoint *** 2645,2650 **** --- 2662,2670 ---- buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, block, &lmbuffer); + /* * Before locking the buffer, pin the visibility map page if it appears to * be necessary. Since we haven't got the lock yet, someone else might be *************** l1: *** 2797,2802 **** --- 2817,2824 ---- UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); return result; } *************** l1: *** 2912,2917 **** --- 2934,2941 ---- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); PageSetLSN(page, recptr); + + lsnmap_set(relation, block, lmbuffer, recptr); } END_CRIT_SECTION(); *************** l1: *** 2920,2926 **** if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); ! /* * If the tuple has toasted out-of-line attributes, we need to delete * those items too. We have to do this before releasing the buffer --- 2944,2951 ---- if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); ! if (lmbuffer != InvalidBuffer) ! ReleaseBuffer(lmbuffer); /* * If the tuple has toasted out-of-line attributes, we need to delete * those items too. We have to do this before releasing the buffer *************** heap_update(Relation relation, ItemPoint *** 3053,3059 **** Buffer buffer, newbuf, vmbuffer = InvalidBuffer, ! vmbuffer_new = InvalidBuffer; bool need_toast, already_marked; Size newtupsize, --- 3078,3086 ---- Buffer buffer, newbuf, vmbuffer = InvalidBuffer, ! vmbuffer_new = InvalidBuffer, ! lmbuffer = InvalidBuffer, ! lmbuffer_new = InvalidBuffer; bool need_toast, already_marked; Size newtupsize, *************** heap_update(Relation relation, ItemPoint *** 3099,3104 **** --- 3126,3134 ---- buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, block, &lmbuffer); + /* * Before locking the buffer, pin the visibility map page if it appears to * be necessary. Since we haven't got the lock yet, someone else might be *************** l2: *** 3390,3395 **** --- 3420,3427 ---- UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); bms_free(hot_attrs); bms_free(key_attrs); return result; *************** l2: *** 3570,3576 **** /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer); } else { --- 3602,3609 ---- /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer, ! &lmbuffer_new, &lmbuffer); } else { *************** l2: *** 3588,3594 **** LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer); } else { --- 3621,3628 ---- LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer, ! &lmbuffer_new, &lmbuffer); } else { *************** l2: *** 3740,3747 **** --- 3774,3783 ---- if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); + lsnmap_set(relation, BufferGetBlockNumber(newbuf), lmbuffer_new, recptr); } PageSetLSN(BufferGetPage(buffer), recptr); + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** l2: *** 3768,3774 **** ReleaseBuffer(vmbuffer_new); if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); ! /* * Release the lmgr tuple lock, if we had it. */ --- 3804,3813 ---- ReleaseBuffer(vmbuffer_new); if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); ! if (BufferIsValid(lmbuffer_new)) ! ReleaseBuffer(lmbuffer_new); ! if (BufferIsValid(lmbuffer)) ! ReleaseBuffer(lmbuffer); /* * Release the lmgr tuple lock, if we had it. */ *************** heap_lock_tuple(Relation relation, HeapT *** 4091,4096 **** --- 4130,4136 ---- HTSU_Result result; ItemPointer tid = &(tuple->t_self); ItemId lp; + Buffer lmbuffer = InvalidBuffer; Page page; TransactionId xid, xmax; *************** failed: *** 4567,4572 **** --- 4607,4615 ---- return HeapTupleMayBeUpdated; } + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(*buffer), &lmbuffer); + /* * If this is the first possibly-multixact-able operation in the current * transaction, set my per-backend OldestMemberMXactId setting. We can be *************** failed: *** 4647,4652 **** --- 4690,4697 ---- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); PageSetLSN(page, recptr); + + lsnmap_set(relation, BufferGetBlockNumber(*buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** failed: *** 4658,4663 **** --- 4703,4711 ---- * visibility info. */ + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* * Now that we have successfully marked the tuple as locked, we can * release the lmgr tuple lock, if we had it. *************** heap_lock_updated_tuple_rec(Relation rel *** 5100,5106 **** { ItemPointerData tupid; HeapTupleData mytup; ! Buffer buf; uint16 new_infomask, new_infomask2, old_infomask, --- 5148,5155 ---- { ItemPointerData tupid; HeapTupleData mytup; ! Buffer buf, ! lmbuffer = InvalidBuffer; uint16 new_infomask, new_infomask2, old_infomask, *************** heap_lock_updated_tuple_rec(Relation rel *** 5129,5134 **** --- 5178,5186 ---- return HeapTupleMayBeUpdated; } + if (RelationNeedsWAL(rel)) + lsnmap_pin(rel, BufferGetBlockNumber(buf), &lmbuffer); + l4: CHECK_FOR_INTERRUPTS(); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); *************** l4: *** 5142,5147 **** --- 5194,5201 ---- priorXmax)) { UnlockReleaseBuffer(buf); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); return HeapTupleMayBeUpdated; } *************** l4: *** 5189,5194 **** --- 5243,5250 ---- if (res != HeapTupleMayBeUpdated) { UnlockReleaseBuffer(buf); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); pfree(members); return res; } *************** l4: *** 5249,5254 **** --- 5305,5312 ---- if (res != HeapTupleMayBeUpdated) { UnlockReleaseBuffer(buf); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); return res; } } *************** l4: *** 5289,5298 **** --- 5347,5361 ---- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED); PageSetLSN(page, recptr); + + lsnmap_set(rel, BufferGetBlockNumber(buf), lmbuffer, recptr); } END_CRIT_SECTION(); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* if we find the end of update chain, we're done. */ if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID || ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) || *************** heap_lock_updated_tuple(Relation rel, He *** 5374,5380 **** void heap_inplace_update(Relation relation, HeapTuple tuple) { ! Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; --- 5437,5444 ---- void heap_inplace_update(Relation relation, HeapTuple tuple) { ! Buffer buffer, ! lmbuffer = InvalidBuffer; Page page; OffsetNumber offnum; ItemId lp = NULL; *************** heap_inplace_update(Relation relation, H *** 5383,5388 **** --- 5447,5456 ---- uint32 newlen; buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); + + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(buffer), &lmbuffer); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); *************** heap_inplace_update(Relation relation, H *** 5426,5437 **** --- 5494,5510 ---- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); PageSetLSN(page, recptr); + + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); UnlockReleaseBuffer(buffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* * Send out shared cache inval if necessary. Note that because we only * pass the new version of the tuple, this mustn't be used for any *************** heap_xlog_clean(XLogReaderState *record) *** 7024,7029 **** --- 7097,7115 ---- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(rnode); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* * If we have a full-page image, restore it (using a cleanup lock) and * we're done. */ *************** heap_xlog_freeze_page(XLogReaderState *r *** 7208,7225 **** xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; Buffer buffer; int ntup; /* * In Hot Standby mode, ensure that there's no queries running which still * consider the frozen xids as running. */ if (InHotStandby) { ! RelFileNode rnode; ! XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); ! ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) --- 7294,7323 ---- xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; Buffer buffer; + RelFileNode rnode; + BlockNumber blkno; int ntup; + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + /* * In Hot Standby mode, ensure that there's no queries running which still * consider the frozen xids as running. */ if (InHotStandby) + ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode); + + /* + * Update the LSN map + */ { ! Relation reln = CreateFakeRelcacheEntry(rnode); ! Buffer lmbuffer = InvalidBuffer; ! lsnmap_pin(reln, blkno, &lmbuffer); ! lsnmap_set(reln, blkno, lmbuffer, lsn); ! ReleaseBuffer(lmbuffer); ! FreeFakeRelcacheEntry(reln); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) *************** heap_xlog_delete(XLogReaderState *record *** 7309,7314 **** --- 7407,7425 ---- FreeFakeRelcacheEntry(reln); } + /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(target_node); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); *************** heap_xlog_insert(XLogReaderState *record *** 7385,7390 **** --- 7496,7514 ---- } /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(target_node); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* * If we inserted the first and only tuple on the page, re-initialize the * page from scratch. */ *************** heap_xlog_multi_insert(XLogReaderState * *** 7504,7509 **** --- 7628,7646 ---- FreeFakeRelcacheEntry(reln); } + /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(rnode); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (isinit) { buffer = XLogInitBufferForRedo(record, 0); *************** heap_xlog_update(XLogReaderState *record *** 7660,7665 **** --- 7797,7820 ---- } /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(rnode); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, oldblk, &lmbuffer); + lsnmap_set(reln, oldblk, lmbuffer, lsn); + if (oldblk != newblk) + { + lsnmap_pin(reln, newblk, &lmbuffer); + lsnmap_set(reln, newblk, lmbuffer, lsn); + } + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* * In normal operation, it is important to lock the two pages in * page-number order, to avoid possible deadlocks against other update * operations going the other way. However, during WAL replay there can *************** heap_xlog_lock(XLogReaderState *record) *** 7882,7887 **** --- 8037,8060 ---- ItemId lp = NULL; HeapTupleHeader htup; + /* + * Update the LSN map + */ + { + Relation reln; + RelFileNode rnode; + BlockNumber blkno; + Buffer lmbuffer = InvalidBuffer; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + reln = CreateFakeRelcacheEntry(rnode); + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); *************** heap_xlog_lock_updated(XLogReaderState * *** 7930,7935 **** --- 8103,8126 ---- ItemId lp = NULL; HeapTupleHeader htup; + /* + * Update the LSN map + */ + { + Relation reln; + RelFileNode rnode; + BlockNumber blkno; + Buffer lmbuffer = InvalidBuffer; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + reln = CreateFakeRelcacheEntry(rnode); + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + xlrec = (xl_heap_lock_updated *) XLogRecGetData(record); if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) *************** heap_xlog_inplace(XLogReaderState *recor *** 7969,7974 **** --- 8160,8183 ---- uint32 oldlen; Size newlen; + /* + * Update the LSN map + */ + { + Relation reln; + RelFileNode rnode; + BlockNumber blkno; + Buffer lmbuffer = InvalidBuffer; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + reln = CreateFakeRelcacheEntry(rnode); + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { char *newtup = XLogRecGetBlockData(record, 0, &newlen); diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 6d091f6..09e93d0 100644 *** a/src/backend/access/heap/hio.c --- b/src/backend/access/heap/hio.c *************** *** 19,24 **** --- 19,25 ---- #include "access/hio.h" #include "access/htup_details.h" #include "access/visibilitymap.h" + #include "access/lsnmap.h" #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/lmgr.h" *************** Buffer *** 215,221 **** RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other) { bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM); Buffer buffer = InvalidBuffer; --- 216,223 ---- RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other, ! Buffer *lmbuffer, Buffer *lmbuffer_other) { bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM); Buffer buffer = InvalidBuffer; *************** RelationGetBufferForTuple(Relation relat *** 297,302 **** --- 299,308 ---- while (targetBlock != InvalidBlockNumber) { + + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, targetBlock, lmbuffer); + /* * Read and exclusive-lock the target block, as well as the other * block if one was given, taking suitable care with lock ordering and *************** RelationGetBufferForTuple(Relation relat *** 438,443 **** --- 444,452 ---- */ buffer = ReadBufferBI(relation, P_NEW, bistate); + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(buffer), lmbuffer); + /* * We can be certain that locking the otherBuffer first is OK, since it * must have a lower page number. diff --git a/src/backend/access/heap/lsnmap.c b/src/backend/access/heap/lsnmap.c index ...e736ed6 100644 *** a/src/backend/access/heap/lsnmap.c --- b/src/backend/access/heap/lsnmap.c *************** *** 0 **** --- 1,336 ---- + /*------------------------------------------------------------------------- + * + * lsnmap.c + * map for tracking LSN of heap blocks + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/heap/lsnmap.c + * + * INTERFACE ROUTINES + * lsnmap_pin - pin a map page for setting a bit + * lsnmap_set - set a bit in a previously pinned page + * lsnmap_truncate - truncate the LSN map + * + * NOTES + * + * The LSN map contains an LSN per HEAPBLOCKS_PER_ENTRY heap pages. Every entry + * means that no modification have been made to the pages that are part of group + * after that LSN. + * + * The LSN map is not wal logged, but is updated during log reply. + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include "access/heapam_xlog.h" + #include "access/lsnmap.h" + #include "miscadmin.h" + #include "storage/bufmgr.h" + #include "storage/lmgr.h" + #include "storage/smgr.h" + #include "utils/inval.h" + + + /*#define TRACE_LSNMAP */ + + /* Number of pages per LSN map entry/ */ + #define HEAPBLOCKS_PER_ENTRY 2048 + + /* Size of an LSN map entry */ + #define BYTES_PER_ENTRY (sizeof(XLogRecPtr)) + + /* + * Size of the map on each LSN map page, in bytes. There's no + * extra headers, so the whole page minus the standard page header is + * used for the bitmap. + */ + #define MAPSIZE TYPEALIGN_DOWN(BYTES_PER_ENTRY, BLCKSZ - MAXALIGN(SizeOfPageHeaderData)) + + /* Number of heap blocks we can represent in one LSN map page. */ + #define HEAPBLOCKS_PER_PAGE (MAPSIZE / BYTES_PER_ENTRY * HEAPBLOCKS_PER_ENTRY) + + /* Mapping from heap block number to the right bit in the LSN map */ + #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE) + #define HEAPBLK_TO_MAPPOS(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_ENTRY) + + /* prototypes for internal routines */ + static Buffer lm_readbuf(Relation rel, BlockNumber blkno, bool extend); + static void lm_extend(Relation rel, BlockNumber nlmblocks); + + /* + * lsnmap_pin - pin a map page for setting an entry + * + * Setting an entry in the LSN map is a two-phase operation. First, call + * lsnmap_pin, to pin the LSN map page containing the bit for + * the heap page. Because that can require I/O to read the map page, you + * shouldn't hold a lock on the heap page while doing that. Then, call + * lsnmap_set to actually set the bit. + * + * On entry, *buf should be InvalidBuffer or a valid buffer returned by + * an earlier call to lsnmap_pin on the same relation. + * On return, *buf is a valid buffer with the map page containing + * the entry for heapBlk. + * + * If the page doesn't exist in the map file yet, it is extended. + */ + void + lsnmap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf) + { + BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk); + + /* Reuse the old pinned buffer if possible */ + if (BufferIsValid(*buf)) + { + if (BufferGetBlockNumber(*buf) == mapBlock) + return; + + ReleaseBuffer(*buf); + } + *buf = lm_readbuf(rel, mapBlock, true); + } + + /* + * lsnmap_set - set an entry on a previously pinned page + * + * You must pass a buffer containing the correct map page to this function. + * Call lsnmap_pin first to pin the right one. This function doesn't do + * any I/O. + */ + void + lsnmap_set(Relation rel, BlockNumber heapBlk, Buffer lmBuf, XLogRecPtr lsn) + { + BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk); + uint32 mapPos = HEAPBLK_TO_MAPPOS(heapBlk); + XLogRecPtr *map; + + #ifdef TRACE_LSNMAP + elog(DEBUG1, "lm_set %s %d", RelationGetRelationName(rel), heapBlk); + #endif + + /* Check that we have the right LM page pinned */ + if (!BufferIsValid(lmBuf) || BufferGetBlockNumber(lmBuf) != mapBlock) + elog(ERROR, "wrong LM buffer passed to lsnmap_set"); + + LockBuffer(lmBuf, BUFFER_LOCK_EXCLUSIVE); + + map = (XLogRecPtr *) PageGetContents(BufferGetPage(lmBuf)); + + if (map[mapPos] < lsn) + { + map[mapPos] = lsn; + MarkBufferDirty(lmBuf); + } + + LockBuffer(lmBuf, BUFFER_LOCK_UNLOCK); + } + + /* + * lsnmap_truncate - truncate the LSN map + * + * The caller must hold AccessExclusiveLock on the relation, to ensure that + * other backends receive the smgr invalidation event that this function sends + * before they access the LM again. + * + * nheapblocks is the new size of the heap. + */ + void + lsnmap_truncate(Relation rel, BlockNumber nheapblocks) + { + BlockNumber newnblocks; + + /* last remaining block, byte, and bit */ + BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks); + uint32 truncPos = HEAPBLK_TO_MAPPOS(nheapblocks); + + #ifdef TRACE_LSNMAP + elog(DEBUG1, "lm_truncate %s %d", RelationGetRelationName(rel), nheapblocks); + #endif + + RelationOpenSmgr(rel); + + /* + * If no LSN map has been created yet for this relation, there's + * nothing to truncate. + */ + if (!smgrexists(rel->rd_smgr, LSNMAP_FORKNUM)) + return; + + /* + * Unless the new size is exactly at a LSN map page boundary, the + * tail bits in the last remaining map page, representing truncated heap + * blocks, need to be cleared. This is not only tidy, but also necessary + * because we don't get a chance to clear the bits if the heap is extended + * again. + */ + if (truncPos != 0) + { + Buffer mapBuffer; + Page page; + XLogRecPtr *map; + + newnblocks = truncBlock + 1; + + mapBuffer = lm_readbuf(rel, truncBlock, false); + if (!BufferIsValid(mapBuffer)) + { + /* nothing to do, the file was already smaller */ + return; + } + + page = BufferGetPage(mapBuffer); + map = (XLogRecPtr *) PageGetContents(page); + + LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE); + + /* Clear out the unwanted bytes. */ + MemSet(&map[truncPos + 1], 0, MAPSIZE - (truncPos * BYTES_PER_ENTRY + 1)); + + MarkBufferDirty(mapBuffer); + UnlockReleaseBuffer(mapBuffer); + } + else + newnblocks = truncBlock; + + if (smgrnblocks(rel->rd_smgr, LSNMAP_FORKNUM) <= newnblocks) + { + /* nothing to do, the file was already smaller than requested size */ + return; + } + + /* Truncate the unused LM pages, and send smgr inval message */ + smgrtruncate(rel->rd_smgr, LSNMAP_FORKNUM, newnblocks); + + /* + * We might as well update the local smgr_lm_nblocks setting. smgrtruncate + * sent an smgr cache inval message, which will cause other backends to + * invalidate their copy of smgr_lm_nblocks, and this one too at the next + * command boundary. But this ensures it isn't outright wrong until then. + */ + if (rel->rd_smgr) + rel->rd_smgr->smgr_lm_nblocks = newnblocks; + } + + /* + * Read a LSN map page. + * + * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is + * true, the LSN map file is extended. + */ + static Buffer + lm_readbuf(Relation rel, BlockNumber blkno, bool extend) + { + Buffer buf; + + /* + * We might not have opened the relation at the smgr level yet, or we + * might have been forced to close it by a sinval message. The code below + * won't necessarily notice relation extension immediately when extend = + * false, so we rely on sinval messages to ensure that our ideas about the + * size of the map aren't too far out of date. + */ + RelationOpenSmgr(rel); + + /* + * If we haven't cached the size of the LSN map fork yet, check it + * first. + */ + if (rel->rd_smgr->smgr_lm_nblocks == InvalidBlockNumber) + { + if (smgrexists(rel->rd_smgr, LSNMAP_FORKNUM)) + rel->rd_smgr->smgr_lm_nblocks = smgrnblocks(rel->rd_smgr, + LSNMAP_FORKNUM); + else + rel->rd_smgr->smgr_lm_nblocks = 0; + } + + /* Handle requests beyond EOF */ + if (blkno >= rel->rd_smgr->smgr_lm_nblocks) + { + if (extend) + lm_extend(rel, blkno + 1); + else + return InvalidBuffer; + } + + /* + * Use ZERO_ON_ERROR mode, and initialize the page if necessary. It's + * always safe to clear bits, so it's better to clear corrupt pages than + * error out. + */ + buf = ReadBufferExtended(rel, LSNMAP_FORKNUM, blkno, + RBM_ZERO_ON_ERROR, NULL); + if (PageIsNew(BufferGetPage(buf))) + PageInit(BufferGetPage(buf), BLCKSZ, 0); + return buf; + } + + /* + * Ensure that the LSN map fork is at least lm_nblocks long, extending + * it if necessary with zeroed pages. + */ + static void + lm_extend(Relation rel, BlockNumber lm_nblocks) + { + BlockNumber lm_nblocks_now; + Page pg; + + pg = (Page) palloc(BLCKSZ); + PageInit(pg, BLCKSZ, 0); + + /* + * We use the relation extension lock to lock out other backends trying to + * extend the LSN map at the same time. It also locks out extension + * of the main fork, unnecessarily, but extending the LSN map + * happens seldom enough that it doesn't seem worthwhile to have a + * separate lock tag type for it. + * + * Note that another backend might have extended or created the relation + * by the time we get the lock. + */ + LockRelationForExtension(rel, ExclusiveLock); + + /* Might have to re-open if a cache flush happened */ + RelationOpenSmgr(rel); + + /* + * Create the file first if it doesn't exist. If smgr_lm_nblocks is + * positive then it must exist, no need for an smgrexists call. + */ + if ((rel->rd_smgr->smgr_lm_nblocks == 0 || + rel->rd_smgr->smgr_lm_nblocks == InvalidBlockNumber) && + !smgrexists(rel->rd_smgr, LSNMAP_FORKNUM)) + smgrcreate(rel->rd_smgr, LSNMAP_FORKNUM, false); + + lm_nblocks_now = smgrnblocks(rel->rd_smgr, LSNMAP_FORKNUM); + + /* Now extend the file */ + while (lm_nblocks_now < lm_nblocks) + { + PageSetChecksumInplace(pg, lm_nblocks_now); + + smgrextend(rel->rd_smgr, LSNMAP_FORKNUM, lm_nblocks_now, + (char *) pg, false); + lm_nblocks_now++; + } + + /* + * Send a shared-inval message to force other backends to close any smgr + * references they may have for this rel, which we are about to change. + * This is a useful optimization because it means that backends don't have + * to keep checking for creation or extension of the file, which happens + * infrequently. + */ + CacheInvalidateSmgr(rel->rd_smgr->smgr_rnode); + + /* Update local cache with the up-to-date size */ + rel->rd_smgr->smgr_lm_nblocks = lm_nblocks_now; + + UnlockRelationForExtension(rel, ExclusiveLock); + + pfree(pg); + } diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c index 563e5c3..4586ef3 100644 *** a/src/backend/access/heap/pruneheap.c --- b/src/backend/access/heap/pruneheap.c *************** *** 18,23 **** --- 18,24 ---- #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/htup_details.h" + #include "access/lsnmap.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "miscadmin.h" *************** heap_page_prune(Relation relation, Buffe *** 175,184 **** --- 176,189 ---- { int ndeleted = 0; Page page = BufferGetPage(buffer); + Buffer lmbuffer = InvalidBuffer; OffsetNumber offnum, maxoff; PruneState prstate; + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(buffer), &lmbuffer); + /* * Our strategy is to scan the page and make lists of items to change, * then apply the changes within a critical section. This keeps as much *************** heap_page_prune(Relation relation, Buffe *** 262,267 **** --- 267,274 ---- prstate.latestRemovedXid); PageSetLSN(BufferGetPage(buffer), recptr); + + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } } else *************** heap_page_prune(Relation relation, Buffe *** 286,291 **** --- 293,301 ---- END_CRIT_SECTION(); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* * If requested, report the number of tuples reclaimed to pgstats. This is * ndeleted minus ndead, because we don't want to count a now-DEAD root diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index af5c158..0e10567 100644 *** a/src/backend/access/heap/rewriteheap.c --- b/src/backend/access/heap/rewriteheap.c *************** *** 109,114 **** --- 109,115 ---- #include "access/heapam.h" #include "access/heapam_xlog.h" + #include "access/lsnmap.h" #include "access/rewriteheap.h" #include "access/transam.h" #include "access/tuptoaster.h" *************** typedef struct RewriteStateData *** 143,148 **** --- 144,150 ---- Page rs_buffer; /* page currently being built */ BlockNumber rs_blockno; /* block where page will go */ bool rs_buffer_valid; /* T if any tuples in buffer */ + Buffer rs_lmbuffer; /* LSN map buffer */ bool rs_use_wal; /* must we WAL-log inserts? */ bool rs_logical_rewrite; /* do we need to do logical rewriting */ TransactionId rs_oldest_xmin; /* oldest xmin used by caller to *************** begin_heap_rewrite(Relation old_heap, Re *** 272,277 **** --- 274,280 ---- /* new_heap needn't be empty, just locked */ state->rs_blockno = RelationGetNumberOfBlocks(new_heap); state->rs_buffer_valid = false; + state->rs_lmbuffer = InvalidBuffer; state->rs_use_wal = use_wal; state->rs_oldest_xmin = oldest_xmin; state->rs_freeze_xid = freeze_xid; *************** end_heap_rewrite(RewriteState state) *** 332,342 **** if (state->rs_buffer_valid) { if (state->rs_use_wal) ! log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! state->rs_buffer, ! true); RelationOpenSmgr(state->rs_new_rel); PageSetChecksumInplace(state->rs_buffer, state->rs_blockno); --- 335,350 ---- if (state->rs_buffer_valid) { if (state->rs_use_wal) ! { ! XLogRecPtr recptr; ! lsnmap_pin(state->rs_new_rel, state->rs_blockno, &state->rs_lmbuffer); ! recptr = log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! state->rs_buffer, ! true); ! lsnmap_set(state->rs_new_rel, state->rs_blockno, state->rs_lmbuffer, recptr); ! } RelationOpenSmgr(state->rs_new_rel); PageSetChecksumInplace(state->rs_buffer, state->rs_blockno); *************** end_heap_rewrite(RewriteState state) *** 361,366 **** --- 369,378 ---- logical_end_heap_rewrite(state); + /* release the LSN map buffer*/ + if (state->rs_lmbuffer != InvalidBuffer) + ReleaseBuffer(state->rs_lmbuffer); + /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); } *************** raw_heap_insert(RewriteState state, Heap *** 681,691 **** /* XLOG stuff */ if (state->rs_use_wal) ! log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! page, ! true); /* * Now write the page. We say isTemp = true even if it's not a --- 693,708 ---- /* XLOG stuff */ if (state->rs_use_wal) ! { ! XLogRecPtr recptr; ! lsnmap_pin(state->rs_new_rel, state->rs_blockno, &state->rs_lmbuffer); ! recptr = log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! page, ! true); ! lsnmap_set(state->rs_new_rel, state->rs_blockno, state->rs_lmbuffer, recptr); ! } /* * Now write the page. We say isTemp = true even if it's not a diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index ce398fc..979b649 100644 *** a/src/backend/catalog/storage.c --- b/src/backend/catalog/storage.c *************** *** 20,25 **** --- 20,26 ---- #include "postgres.h" #include "access/visibilitymap.h" + #include "access/lsnmap.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" *************** RelationTruncate(Relation rel, BlockNumb *** 228,233 **** --- 229,235 ---- { bool fsm; bool vm; + bool lm; /* Open it at the smgr level if not already done */ RelationOpenSmgr(rel); *************** RelationTruncate(Relation rel, BlockNumb *** 238,243 **** --- 240,246 ---- rel->rd_smgr->smgr_targblock = InvalidBlockNumber; rel->rd_smgr->smgr_fsm_nblocks = InvalidBlockNumber; rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber; + rel->rd_smgr->smgr_lm_nblocks = InvalidBlockNumber; /* Truncate the FSM first if it exists */ fsm = smgrexists(rel->rd_smgr, FSM_FORKNUM); *************** RelationTruncate(Relation rel, BlockNumb *** 249,254 **** --- 252,262 ---- if (vm) visibilitymap_truncate(rel, nblocks); + /* Truncate the LSN map too if it exists. */ + lm = smgrexists(rel->rd_smgr, LSNMAP_FORKNUM); + if (lm) + lsnmap_truncate(rel, nblocks); + /* * We WAL-log the truncation before actually truncating, which means * trouble if the truncation fails. If we then crash, the WAL replay diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 66d5083..e805324 100644 *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** ATExecSetTableSpace(Oid tableOid, Oid ne *** 9299,9305 **** /* copy those extra forks that exist */ for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++) { ! if (smgrexists(rel->rd_smgr, forkNum)) { smgrcreate(dstrel, forkNum, false); copy_relation_data(rel->rd_smgr, dstrel, forkNum, --- 9299,9306 ---- /* copy those extra forks that exist */ for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++) { ! /* LSN map need to be skipped as it contains invalid data */ ! if (forkNum != LSNMAP_FORKNUM && smgrexists(rel->rd_smgr, forkNum)) { smgrcreate(dstrel, forkNum, false); copy_relation_data(rel->rd_smgr, dstrel, forkNum, *************** ATExecSetTableSpace(Oid tableOid, Oid ne *** 9307,9312 **** --- 9308,9315 ---- } } + /* TODO: build a correct LSN map here */ + /* drop old relation, and close new one */ RelationDropStorage(rel); smgrclose(dstrel); diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index e653bbd..b0d24d7 100644 *** a/src/backend/commands/vacuumlazy.c --- b/src/backend/commands/vacuumlazy.c *************** *** 41,46 **** --- 41,47 ---- #include "access/heapam.h" #include "access/heapam_xlog.h" #include "access/htup_details.h" + #include "access/lsnmap.h" #include "access/multixact.h" #include "access/transam.h" #include "access/visibilitymap.h" *************** static void lazy_cleanup_index(Relation *** 146,152 **** IndexBulkDeleteResult *stats, LVRelStats *vacrelstats); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); --- 147,153 ---- IndexBulkDeleteResult *stats, LVRelStats *vacrelstats); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer, Buffer *lmbuffer); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); *************** lazy_scan_heap(Relation onerel, LVRelSta *** 456,462 **** IndexBulkDeleteResult **indstats; int i; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer; BlockNumber next_not_all_visible_block; bool skipping_all_visible_blocks; xl_heap_freeze_tuple *frozen; --- 457,464 ---- IndexBulkDeleteResult **indstats; int i; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; BlockNumber next_not_all_visible_block; bool skipping_all_visible_blocks; xl_heap_freeze_tuple *frozen; *************** lazy_scan_heap(Relation onerel, LVRelSta *** 618,623 **** --- 620,628 ---- vacrelstats->num_index_scans++; } + if (RelationNeedsWAL(onerel)) + lsnmap_pin(onerel, blkno, &lmbuffer); + /* * Pin the visibility map page in case we need to mark the page * all-visible. In most cases this will be very cheap, because we'll *************** lazy_scan_heap(Relation onerel, LVRelSta *** 966,971 **** --- 971,978 ---- recptr = log_heap_freeze(onerel, buf, FreezeLimit, frozen, nfrozen); PageSetLSN(page, recptr); + + lsnmap_set(onerel, BufferGetBlockNumber(buf), lmbuffer, recptr); } END_CRIT_SECTION(); *************** lazy_scan_heap(Relation onerel, LVRelSta *** 979,985 **** vacrelstats->num_dead_tuples > 0) { /* Remove tuples from heap */ ! lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer); has_dead_tuples = false; /* --- 986,992 ---- vacrelstats->num_dead_tuples > 0) { /* Remove tuples from heap */ ! lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer, &lmbuffer); has_dead_tuples = false; /* *************** lazy_scan_heap(Relation onerel, LVRelSta *** 1090,1095 **** --- 1097,1107 ---- ReleaseBuffer(vmbuffer); vmbuffer = InvalidBuffer; } + if (BufferIsValid(lmbuffer)) + { + ReleaseBuffer(lmbuffer); + lmbuffer = InvalidBuffer; + } /* If any tuples need to be deleted, perform final vacuum cycle */ /* XXX put a threshold on min number of tuples here? */ *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1170,1176 **** int tupindex; int npages; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer; pg_rusage_init(&ru0); npages = 0; --- 1182,1189 ---- int tupindex; int npages; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; pg_rusage_init(&ru0); npages = 0; *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1195,1201 **** continue; } tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, ! &vmbuffer); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); --- 1208,1214 ---- continue; } tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, ! &vmbuffer, &lmbuffer); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1211,1216 **** --- 1224,1234 ---- ReleaseBuffer(vmbuffer); vmbuffer = InvalidBuffer; } + if (BufferIsValid(lmbuffer)) + { + ReleaseBuffer(lmbuffer); + lmbuffer = InvalidBuffer; + } ereport(elevel, (errmsg("\"%s\": removed %d row versions in %d pages", *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1232,1244 **** */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; TransactionId visibility_cutoff_xid; START_CRIT_SECTION(); for (; tupindex < vacrelstats->num_dead_tuples; tupindex++) --- 1250,1265 ---- */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer, Buffer *lmbuffer) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; TransactionId visibility_cutoff_xid; + if (RelationNeedsWAL(onerel)) + lsnmap_pin(onerel, blkno, lmbuffer); + START_CRIT_SECTION(); for (; tupindex < vacrelstats->num_dead_tuples; tupindex++) *************** lazy_vacuum_page(Relation onerel, BlockN *** 1273,1278 **** --- 1294,1301 ---- unused, uncnt, vacrelstats->latestRemovedXid); PageSetLSN(page, recptr); + + lsnmap_set(onerel, BufferGetBlockNumber(buffer), *lmbuffer, recptr); } /* diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 244b4ea..882dcbe 100644 *** a/src/backend/storage/smgr/smgr.c --- b/src/backend/storage/smgr/smgr.c *************** smgropen(RelFileNode rnode, BackendId ba *** 168,173 **** --- 168,174 ---- reln->smgr_targblock = InvalidBlockNumber; reln->smgr_fsm_nblocks = InvalidBlockNumber; reln->smgr_vm_nblocks = InvalidBlockNumber; + reln->smgr_lm_nblocks = InvalidBlockNumber; reln->smgr_which = 0; /* we only have md.c at present */ /* mark it not open */ diff --git a/src/common/relpath.c b/src/common/relpath.c index 66dfef1..8d52be7 100644 *** a/src/common/relpath.c --- b/src/common/relpath.c *************** const char *const forkNames[] = { *** 35,41 **** "main", /* MAIN_FORKNUM */ "fsm", /* FSM_FORKNUM */ "vm", /* VISIBILITYMAP_FORKNUM */ ! "init" /* INIT_FORKNUM */ }; /* --- 35,42 ---- "main", /* MAIN_FORKNUM */ "fsm", /* FSM_FORKNUM */ "vm", /* VISIBILITYMAP_FORKNUM */ ! "init", /* INIT_FORKNUM */ ! "lm" /* LSNMAP_FORKNUM*/ }; /* *************** forkname_to_number(const char *forkName) *** 58,64 **** (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid fork name"), errhint("Valid fork names are \"main\", \"fsm\", " ! "\"vm\", and \"init\"."))); #endif return InvalidForkNumber; --- 59,65 ---- (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid fork name"), errhint("Valid fork names are \"main\", \"fsm\", " ! "\"vm\", \"init\" and \"lm\"."))); #endif return InvalidForkNumber; diff --git a/src/include/access/hio.h b/src/include/access/hio.h index b014029..1ac5762 100644 *** a/src/include/access/hio.h --- b/src/include/access/hio.h *************** extern void RelationPutHeapTuple(Relatio *** 40,45 **** extern Buffer RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other); #endif /* HIO_H */ --- 40,46 ---- extern Buffer RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other, ! Buffer *lmbuffer, Buffer *lmbuffer_other); #endif /* HIO_H */ diff --git a/src/include/access/lsnmap.h b/src/include/access/lsnmap.h index ...e61bbc3 100644 *** a/src/include/access/lsnmap.h --- b/src/include/access/lsnmap.h *************** *** 0 **** --- 1,28 ---- + /*------------------------------------------------------------------------- + * + * lsnmap.h + * lsn map interface + * + * + * Portions Copyright (c) 2007-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/lsnmap.h + * + *------------------------------------------------------------------------- + */ + #ifndef LSNMAP_H + #define LSNMAP_H + + #include "access/xlogdefs.h" + #include "storage/block.h" + #include "storage/buf.h" + #include "utils/relcache.h" + + extern void lsnmap_pin(Relation rel, BlockNumber heapBlk, + Buffer *lmbuf); + extern void lsnmap_set(Relation rel, BlockNumber heapBlk, Buffer lmBuf, + XLogRecPtr lsn); + extern void lsnmap_truncate(Relation rel, BlockNumber nheapblocks); + + #endif /* LSNMAP_H */ diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h index a263779..0c90191 100644 *** a/src/include/common/relpath.h --- b/src/include/common/relpath.h *************** typedef enum ForkNumber *** 27,33 **** MAIN_FORKNUM = 0, FSM_FORKNUM, VISIBILITYMAP_FORKNUM, ! INIT_FORKNUM /* * NOTE: if you add a new fork, change MAX_FORKNUM and possibly --- 27,34 ---- MAIN_FORKNUM = 0, FSM_FORKNUM, VISIBILITYMAP_FORKNUM, ! INIT_FORKNUM, ! LSNMAP_FORKNUM /* * NOTE: if you add a new fork, change MAX_FORKNUM and possibly *************** typedef enum ForkNumber *** 36,42 **** */ } ForkNumber; ! #define MAX_FORKNUM INIT_FORKNUM #define FORKNAMECHARS 4 /* max chars for a fork name */ --- 37,43 ---- */ } ForkNumber; ! #define MAX_FORKNUM LSNMAP_FORKNUM #define FORKNAMECHARS 4 /* max chars for a fork name */ diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 69a624f..f40532f 100644 *** a/src/include/storage/smgr.h --- b/src/include/storage/smgr.h *************** typedef struct SMgrRelationData *** 55,60 **** --- 55,61 ---- BlockNumber smgr_targblock; /* current insertion target block */ BlockNumber smgr_fsm_nblocks; /* last known size of fsm fork */ BlockNumber smgr_vm_nblocks; /* last known size of vm fork */ + BlockNumber smgr_lm_nblocks; /* last known size of lm fork */ /* additional public fields may someday exist here */ -- 2.2.0
signature.asc
Description: OpenPGP digital signature