Hi,
> While replaying the delete/vacuum record on standby, it can conflict
> with some already running queries. Basically the replay can remove
> some row which can be visible on standby. You need to resolve
> conflicts similar to what we do in btree delete records (refer
> btree_xlog_delete).
Agreed. Thanks for putting this point. I have taken care of it in the
attached v2 patch.
> + /*
> + * Write-lock the meta page so that we can decrement
> + * tuple count.
> + */
> + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
> +
> + _hash_vacuum_one_page(rel, metabuf, buf, bucket_buf,
> + (buf == bucket_buf) ? true : false);
> +
> + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
>
> It seems here meta page lock is acquired for duration more than
> required and also it is not required when there are no deletable items
> on page. You can take the metapage lock before decrementing the count.
Ok. Corrected. Please refer to the attached v2 patch.
> Spurious space. There are some other similar spurious white space
> changes in patch, remove them as well.
Corrected. Please refer attached v2 patch.
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index db73f05..4a4d614 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -157,7 +157,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
if (buildstate.spool)
{
/* sort the tuples and insert them into the index */
- _h_indexbuild(buildstate.spool);
+ _h_indexbuild(buildstate.spool, heap->rd_node);
_h_spooldestroy(buildstate.spool);
}
@@ -196,6 +196,8 @@ hashbuildCallback(Relation index,
Datum index_values[1];
bool index_isnull[1];
IndexTuple itup;
+ Relation rel;
+ RelFileNode rnode;
/* convert data to a hash key; on failure, do not insert anything */
if (!_hash_convert_tuple(index,
@@ -212,8 +214,12 @@ hashbuildCallback(Relation index,
/* form an index tuple and point it at the heap tuple */
itup = index_form_tuple(RelationGetDescr(index),
index_values, index_isnull);
+ /* Get RelfileNode from relation OID */
+ rel = relation_open(htup->t_tableOid, NoLock);
+ rnode = rel->rd_node;
+ relation_close(rel, NoLock);
itup->t_tid = htup->t_self;
- _hash_doinsert(index, itup);
+ _hash_doinsert(index, itup, rnode);
pfree(itup);
}
@@ -245,7 +251,7 @@ hashinsert(Relation rel, Datum *values, bool *isnull,
itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
itup->t_tid = *ht_ctid;
- _hash_doinsert(rel, itup);
+ _hash_doinsert(rel, itup, heapRel->rd_node);
pfree(itup);
@@ -325,14 +331,21 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
if (scan->kill_prior_tuple)
{
/*
- * Yes, so mark it by setting the LP_DEAD state in the item flags.
+ * Yes, so remember it for later. (We'll deal with all such
+ * tuples at once right after leaving the index page or at
+ * end of scan.)
*/
- ItemIdMarkDead(PageGetItemId(page, offnum));
+ if (so->killedItems == NULL)
+ so->killedItems = palloc(MaxIndexTuplesPerPage *
+ sizeof(HashScanPosItem));
- /*
- * Since this can be redone later if needed, mark as a hint.
- */
- MarkBufferDirtyHint(buf, true);
+ if (so->numKilled < MaxIndexTuplesPerPage)
+ {
+ so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
+ so->killedItems[so->numKilled].indexOffset =
+ ItemPointerGetOffsetNumber(&(so->hashso_curpos));
+ so->numKilled++;
+ }
}
/*
@@ -439,6 +452,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
so->hashso_skip_moved_tuples = false;
+ so->killedItems = NULL;
+ so->numKilled = 0;
+
scan->opaque = so;
return scan;
@@ -454,6 +470,10 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
+ /* Before leaving current page, deal with any killed items */
+ if (so->numKilled > 0)
+ hashkillitems(scan);
+
_hash_dropscanbuf(rel, so);
/* set position invalid (this will cause _hash_first call) */
@@ -480,6 +500,10 @@ hashendscan(IndexScanDesc scan)
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
+ /* Before leaving current page, deal with any killed items */
+ if (so->numKilled > 0)
+ hashkillitems(scan);
+
_hash_dropscanbuf(rel, so);
pfree(so);
@@ -809,6 +833,15 @@ hashbucketcleanup(Relation rel, Buffer bucket_buf,
PageIndexMultiDelete(page, deletable, ndeletable);
bucket_dirty = true;
+ /*
+ * Let us mark the page as clean if vacuum removes the DEAD tuples
+ * from an index page. We do this by clearing LH_PAGE_HAS_DEAD_TUPLES
+ * flag.
+ */
+ if (tuples_removed && *tuples_removed > 0 &&
+ opaque->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
+ opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
MarkBufferDirty(buf);
/* XLOG stuff */
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index d030a8d..c6dc20b 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -14,8 +14,13 @@
*/
#include "postgres.h"
+#include "access/heapam_xlog.h"
#include "access/hash_xlog.h"
#include "access/xlogutils.h"
+#include "access/xlog.h"
+#include "access/transam.h"
+#include "storage/procarray.h"
+#include "miscadmin.h"
/*
* replay a hash index meta page
@@ -921,6 +926,247 @@ hash_xlog_update_meta_page(XLogReaderState *record)
UnlockReleaseBuffer(metabuf);
}
+/*
+ * Get the latestRemovedXid from the heap pages pointed at by the index
+ * tuples being deleted. This puts the work for calculating latestRemovedXid
+ * into the recovery path rather than the primary path.
+ *
+ * It's possible that this generates a fair amount of I/O, since an index
+ * block may have hundreds of tuples being deleted. Repeat accesses to the
+ * same heap blocks are common, though are not yet optimised.
+ */
+static TransactionId
+hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
+{
+ xl_hash_vacuum *xlrec = (xl_hash_vacuum *) XLogRecGetData(record);
+ OffsetNumber *unused;
+ Buffer ibuffer,
+ hbuffer;
+ Page ipage,
+ hpage;
+ RelFileNode rnode;
+ BlockNumber blkno;
+ ItemId iitemid,
+ hitemid;
+ IndexTuple itup;
+ HeapTupleHeader htuphdr;
+ BlockNumber hblkno;
+ OffsetNumber hoffnum;
+ TransactionId latestRemovedXid = InvalidTransactionId;
+ int i;
+ char *ptr;
+ Size len;
+
+ /*
+ * If there's nothing running on the standby we don't need to derive a
+ * full latestRemovedXid value, so use a fast path out of here. This
+ * returns InvalidTransactionId, and so will conflict with all HS
+ * transactions; but since we just worked out that that's zero people,
+ * it's OK.
+ */
+ if (CountDBBackends(InvalidOid) == 0)
+ return latestRemovedXid;
+
+ /*
+ * Get index page. If the DB is consistent, this should not fail, nor
+ * should any of the heap page fetches below. If one does, we return
+ * InvalidTransactionId to cancel all HS transactions. That's probably
+ * overkill, but it's safe, and certainly better than panicking here.
+ */
+ XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
+ ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
+
+ if (!BufferIsValid(ibuffer))
+ return InvalidTransactionId;
+ LockBuffer(ibuffer, HASH_READ);
+ ipage = (Page) BufferGetPage(ibuffer);
+
+ /*
+ * Loop through the deleted index items to obtain the TransactionId from
+ * the heap items they point to.
+ */
+ ptr = XLogRecGetBlockData(record, 1, &len);
+
+ unused = (OffsetNumber *) ptr;
+
+ for (i = 0; i < xlrec->ntuples; i++)
+ {
+ /*
+ * Identify the index tuple about to be deleted.
+ */
+ iitemid = PageGetItemId(ipage, unused[i]);
+ itup = (IndexTuple) PageGetItem(ipage, iitemid);
+
+ /*
+ * Locate the heap page that the index tuple points at
+ */
+ hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
+ hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
+ hblkno, RBM_NORMAL);
+
+ if (!BufferIsValid(hbuffer))
+ {
+ UnlockReleaseBuffer(ibuffer);
+ return InvalidTransactionId;
+ }
+ LockBuffer(hbuffer, HASH_READ);
+ hpage = (Page) BufferGetPage(hbuffer);
+
+ /*
+ * Look up the heap tuple header that the index tuple points at by
+ * using the heap node supplied with the xlrec. We can't use
+ * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
+ * Note that we are not looking at tuple data here, just headers.
+ */
+ hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
+ hitemid = PageGetItemId(hpage, hoffnum);
+
+ /*
+ * Follow any redirections until we find something useful.
+ */
+ while (ItemIdIsRedirected(hitemid))
+ {
+ hoffnum = ItemIdGetRedirect(hitemid);
+ hitemid = PageGetItemId(hpage, hoffnum);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * If the heap item has storage, then read the header and use that to
+ * set latestRemovedXid.
+ *
+ * Some LP_DEAD items may not be accessible, so we ignore them.
+ */
+ if (ItemIdHasStorage(hitemid))
+ {
+ htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
+ HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
+ }
+ else if (ItemIdIsDead(hitemid))
+ {
+ /*
+ * Conjecture: if hitemid is dead then it had xids before the xids
+ * marked on LP_NORMAL items. So we just ignore this item and move
+ * onto the next, for the purposes of calculating
+ * latestRemovedxids.
+ */
+ }
+ else
+ Assert(!ItemIdIsUsed(hitemid));
+
+ UnlockReleaseBuffer(hbuffer);
+ }
+
+ UnlockReleaseBuffer(ibuffer);
+
+ /*
+ * If all heap tuples were LP_DEAD then we will be returning
+ * InvalidTransactionId here, which avoids conflicts. This matches
+ * existing logic which assumes that LP_DEAD tuples must already be older
+ * than the latestRemovedXid on the cleanup record that set them as
+ * LP_DEAD, hence must already have generated a conflict.
+ */
+ return latestRemovedXid;
+}
+
+/*
+ * replay delete operation in hash index to remove
+ * tuples marked as DEAD during index tuple insertion.
+ */
+static void
+hash_xlog_vacuum_one_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_vacuum *xldata = (xl_hash_vacuum *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer buffer;
+ Buffer metabuf;
+ Page page;
+ XLogRedoAction action;
+
+ /*
+ * If we have any conflict processing to do, it must happen before we
+ * update the page.
+ *
+ * Hash Index delete records can conflict with standby queries.You might
+ * think that vacuum records would conflict as well, but we've handled
+ * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
+ * cleaned by the vacuum of the heap and so we can resolve any conflicts
+ * just once when that arrives. After that we know that no conflicts
+ * exist from individual hash index vacuum records on that index.
+ */
+ if (InHotStandby)
+ {
+ TransactionId latestRemovedXid =
+ hash_xlog_vacuum_get_latestRemovedXid(record);
+ RelFileNode rnode;
+
+ XLogRecGetBlockTag(record, 1, &rnode, NULL, NULL);
+ ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+ }
+
+ if (xldata->is_primary_bucket_page)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL,
+ true, &buffer);
+ else
+ {
+ RelFileNode rnode;
+ BlockNumber blkno;
+
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+ bucketbuf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+ RBM_NORMAL);
+
+ if (BufferIsValid(bucketbuf))
+ LockBufferForCleanup(bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &buffer);
+ }
+
+ if (action == BLK_NEEDS_REDO)
+ {
+ char *ptr;
+ Size len;
+
+ ptr = XLogRecGetBlockData(record, 1, &len);
+
+ page = (Page) BufferGetPage(buffer);
+
+ if (len > 0)
+ {
+ OffsetNumber *unused;
+ OffsetNumber *unend;
+
+ unused = (OffsetNumber *) ptr;
+ unend = (OffsetNumber *) ((char *) ptr + len);
+
+ if ((unend - unused) > 0)
+ PageIndexMultiDelete(page, unused, unend - unused);
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
+ {
+ Page metapage;
+ HashMetaPage metap;
+
+ metapage = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(metapage);
+
+ metap->hashm_ntuples -= xldata->ntuples;
+
+ PageSetLSN(metapage, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
void
hash_redo(XLogReaderState *record)
{
@@ -964,6 +1210,9 @@ hash_redo(XLogReaderState *record)
case XLOG_HASH_UPDATE_META_PAGE:
hash_xlog_update_meta_page(record);
break;
+ case XLOG_HASH_VACUUM_ONE_PAGE:
+ hash_xlog_vacuum_one_page(record);
+ break;
default:
elog(PANIC, "hash_redo: unknown op code %u", info);
}
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 3514138..7435db0 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -19,7 +19,12 @@
#include "access/hash_xlog.h"
#include "miscadmin.h"
#include "utils/rel.h"
+#include "storage/lwlock.h"
+#include "storage/buf_internals.h"
+static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+ Buffer bucket_buf, bool is_primary_bucket_page,
+ RelFileNode hnode);
/*
* _hash_doinsert() -- Handle insertion of a single index tuple.
@@ -28,7 +33,7 @@
* and hashinsert. By here, itup is completely filled in.
*/
void
-_hash_doinsert(Relation rel, IndexTuple itup)
+_hash_doinsert(Relation rel, IndexTuple itup, RelFileNode hnode)
{
Buffer buf = InvalidBuffer;
Buffer bucket_buf;
@@ -206,6 +211,22 @@ _hash_doinsert(Relation rel, IndexTuple itup)
while (PageGetFreeSpace(page) < itemsz)
{
/*
+ * Check if current page has any DEAD tuples. If yes,
+ * delete these tuples and see if we can get a space for
+ * the new item to be inserted before moving to the next
+ * page in the bucket chain.
+ */
+ if (H_HAS_DEAD_TUPLES(pageopaque) && CheckBufferForCleanup(bucket_buf))
+ {
+ _hash_vacuum_one_page(rel, metabuf, buf, bucket_buf,
+ (buf == bucket_buf) ? true : false,
+ hnode);
+
+ if (PageGetFreeSpace(page) >= itemsz)
+ break; /* OK, now we have enough space */
+ }
+
+ /*
* no space on this page; check for an overflow page
*/
BlockNumber nextblkno = pageopaque->hasho_nextblkno;
@@ -247,7 +268,8 @@ _hash_doinsert(Relation rel, IndexTuple itup)
Assert(PageGetFreeSpace(page) >= itemsz);
}
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
- Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE);
+ Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE ||
+ pageopaque->hasho_flag == (LH_OVERFLOW_PAGE | LH_PAGE_HAS_DEAD_TUPLES));
Assert(pageopaque->hasho_bucket == bucket);
}
@@ -390,3 +412,98 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
RelationGetRelationName(rel));
}
}
+
+/*
+ * _hash_vacuum_one_page - vacuum just one index page.
+ * Try to remove LP_DEAD items from the given page. We
+ * must acquire cleanup lock on the primary bucket page
+ * before calling this function.
+ */
+
+static void
+_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+ Buffer bucket_buf, bool is_primary_bucket_page,
+ RelFileNode hnode)
+{
+ OffsetNumber deletable[MaxOffsetNumber];
+ int ndeletable = 0;
+ OffsetNumber offnum,
+ maxoff;
+ Page page = BufferGetPage(buf);
+ HashPageOpaque pageopaque;
+ HashMetaPage metap;
+ double tuples_removed = 0;
+
+ /* Scan each tuple in page to see if it is marked as LP_DEAD */
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page, offnum);
+
+ if (ItemIdIsDead(itemId))
+ {
+ deletable[ndeletable++] = offnum;
+ tuples_removed += 1;
+ }
+ }
+
+ if (ndeletable > 0)
+ {
+ /* No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ PageIndexMultiDelete(page, deletable, ndeletable);
+
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+ /*
+ * Write-lock the meta page so that we can decrement
+ * tuple count.
+ */
+ _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
+
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+ metap->hashm_ntuples -= tuples_removed;
+
+ MarkBufferDirty(buf);
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_vacuum xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.hnode = hnode;
+ xlrec.is_primary_bucket_page = is_primary_bucket_page;
+ xlrec.ntuples = tuples_removed;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashVacuum);
+
+ /*
+ * primary bucket buffer needs to be registered to ensure
+ * that we acquire cleanup lock during replay.
+ */
+ if (!xlrec.is_primary_bucket_page)
+ XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
+
+ XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(1, (char *) deletable,
+ ndeletable * sizeof(OffsetNumber));
+
+ XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+ _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+ }
+}
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index 0df64a8..316f891 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -473,6 +473,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
break; /* yes, so exit for-loop */
}
+ /* Before leaving current page, deal with any killed items */
+ if (so->numKilled > 0)
+ hashkillitems(scan);
+
/*
* ran off the end of this page, try the next
*/
@@ -562,6 +566,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
break; /* yes, so exit for-loop */
}
+ /* Before leaving current page, deal with any killed items */
+ if (so->numKilled > 0)
+ hashkillitems(scan);
+
/*
* ran off the end of this page, try the next
*/
diff --git a/src/backend/access/hash/hashsort.c b/src/backend/access/hash/hashsort.c
index 8938ab5..aa4c7b7 100644
--- a/src/backend/access/hash/hashsort.c
+++ b/src/backend/access/hash/hashsort.c
@@ -101,7 +101,7 @@ _h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull)
* create an entire index.
*/
void
-_h_indexbuild(HSpool *hspool)
+_h_indexbuild(HSpool *hspool, RelFileNode rnode)
{
IndexTuple itup;
bool should_free;
@@ -128,7 +128,7 @@ _h_indexbuild(HSpool *hspool)
Assert(hashkey >= lasthashkey);
#endif
- _hash_doinsert(hspool->index, itup);
+ _hash_doinsert(hspool->index, itup, rnode);
if (should_free)
pfree(itup);
}
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index b5164d7..4350e32 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -19,6 +19,7 @@
#include "access/relscan.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
+#include "storage/buf_internals.h"
/*
@@ -489,3 +490,72 @@ _hash_get_newbucket(Relation rel, Bucket curr_bucket,
return new_bucket;
}
+
+/*
+ * hashkillitems - set LP_DEAD state for items an indexscan caller has
+ * told us were killed.
+ *
+ * scan->opaque, referenced locally through so, contains information about the
+ * current page and killed tuples thereon (generally, this should only be
+ * called if so->numKilled > 0).
+ *
+ * We match items by heap TID before assuming they are the right ones to
+ * delete. If an item has moved off the current page due to a split, we'll
+ * fail to find it and do nothing (this is not an error case --- we assume
+ * the item will eventually get marked in a future indexscan).
+ */
+void
+hashkillitems(IndexScanDesc scan)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ Page page;
+ HashPageOpaque opaque;
+ OffsetNumber offnum, maxoff;
+ int numKilled = so->numKilled;
+ int i;
+ bool killedsomething = false;
+
+ Assert(so->numKilled > 0);
+ Assert(so->killedItems != NULL);
+
+ /*
+ * Always reset the scan state, so we don't look for same
+ * items on other pages.
+ */
+ so->numKilled = 0;
+
+ page = BufferGetPage(so->hashso_curbuf);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ for (i = 0; i < numKilled; i++)
+ {
+ offnum = so->killedItems[i].indexOffset;
+
+ while (offnum <= maxoff)
+ {
+ ItemId iid = PageGetItemId(page, offnum);
+ IndexTuple ituple = (IndexTuple) PageGetItem(page, iid);
+
+ if (ItemPointerEquals(&ituple->t_tid, &so->killedItems[i].heapTid))
+ {
+ /* found the item */
+ ItemIdMarkDead(iid);
+ killedsomething = true;
+ break; /* out of inner search loop */
+ }
+ offnum = OffsetNumberNext(offnum);
+ }
+ }
+
+ /*
+ * Since this can be redone later if needed, mark as dirty hint.
+ * Whenever we mark anything LP_DEAD, we also set the page's
+ * LH_PAGE_HAS_DEAD_TUPLES flag, which is likewise just a hint.
+ */
+ if (killedsomething)
+ {
+ opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
+ MarkBufferDirtyHint(so->hashso_curbuf, true);
+ }
+}
diff --git a/src/backend/access/rmgrdesc/hashdesc.c b/src/backend/access/rmgrdesc/hashdesc.c
index 245ce97..7fc5721 100644
--- a/src/backend/access/rmgrdesc/hashdesc.c
+++ b/src/backend/access/rmgrdesc/hashdesc.c
@@ -155,6 +155,8 @@ hash_identify(uint8 info)
case XLOG_HASH_UPDATE_META_PAGE:
id = "UPDATE_META_PAGE";
break;
+ case XLOG_HASH_VACUUM_ONE_PAGE:
+ id = "VACUUM_ONE_PAGE";
}
return id;
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index c0434f5..6fc7cd0 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -57,6 +57,7 @@ typedef uint32 Bucket;
#define LH_BUCKET_NEW_PAGE_SPLIT (1 << 4)
#define LH_BUCKET_OLD_PAGE_SPLIT (1 << 5)
#define LH_BUCKET_PAGE_HAS_GARBAGE (1 << 6)
+#define LH_PAGE_HAS_DEAD_TUPLES (1 << 7)
typedef struct HashPageOpaqueData
{
@@ -74,6 +75,7 @@ typedef HashPageOpaqueData *HashPageOpaque;
#define H_NEW_INCOMPLETE_SPLIT(opaque) ((opaque)->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT)
#define H_INCOMPLETE_SPLIT(opaque) (((opaque)->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT) || \
((opaque)->hasho_flag & LH_BUCKET_OLD_PAGE_SPLIT))
+#define H_HAS_DEAD_TUPLES(opaque) ((opaque)->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
/*
* The page ID is for the convenience of pg_filedump and similar utilities,
@@ -83,6 +85,13 @@ typedef HashPageOpaqueData *HashPageOpaque;
*/
#define HASHO_PAGE_ID 0xFF80
+typedef struct HashScanPosItem /* what we remember about each match */
+{
+ ItemPointerData heapTid; /* TID of referenced heap item */
+ OffsetNumber indexOffset; /* index item's location within page */
+} HashScanPosItem;
+
+
/*
* HashScanOpaqueData is private state for a hash index scan.
*/
@@ -116,6 +125,10 @@ typedef struct HashScanOpaqueData
/* Whether scan needs to skip tuples that are moved by split */
bool hashso_skip_moved_tuples;
+
+ /* info about killed items if any (killedItems is NULL if never used) */
+ HashScanPosItem *killedItems; /* tids and offset numbers of killed items */
+ int numKilled; /* number of currently stored items */
} HashScanOpaqueData;
typedef HashScanOpaqueData *HashScanOpaque;
@@ -177,6 +190,7 @@ typedef struct HashMetaPageData
typedef HashMetaPageData *HashMetaPage;
+
/*
* Maximum size of a hash index item (it's okay to have only one per page)
*/
@@ -303,7 +317,7 @@ extern Datum hash_uint32(uint32 k);
/* private routines */
/* hashinsert.c */
-extern void _hash_doinsert(Relation rel, IndexTuple itup);
+extern void _hash_doinsert(Relation rel, IndexTuple itup, RelFileNode hnode);
extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
Size itemsize, IndexTuple itup);
extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
@@ -361,7 +375,7 @@ extern HSpool *_h_spoolinit(Relation heap, Relation index, uint32 num_buckets);
extern void _h_spooldestroy(HSpool *hspool);
extern void _h_spool(HSpool *hspool, ItemPointer self,
Datum *values, bool *isnull);
-extern void _h_indexbuild(HSpool *hspool);
+extern void _h_indexbuild(HSpool *hspool, RelFileNode rnode);
/* hashutil.c */
extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
@@ -381,6 +395,7 @@ extern BlockNumber _hash_get_oldblk(Relation rel, HashPageOpaque opaque);
extern BlockNumber _hash_get_newblk(Relation rel, HashPageOpaque opaque);
extern Bucket _hash_get_newbucket(Relation rel, Bucket curr_bucket,
uint32 lowmask, uint32 maxbucket);
+extern void hashkillitems(IndexScanDesc scan);
/* hash.c */
extern void hashbucketcleanup(Relation rel, Buffer bucket_buf,
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 30e16c0..b4d2bf2 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -43,6 +43,7 @@
#define XLOG_HASH_UPDATE_META_PAGE 0xB0 /* update meta page after
* vacuum */
+#define XLOG_HASH_VACUUM_ONE_PAGE 0xC0 /* remove dead tuples from index page */
/*
* xl_hash_split_allocpage flag values, 8 bits are available.
@@ -257,6 +258,25 @@ typedef struct xl_hash_init_bitmap_page
#define SizeOfHashInitBitmapPage \
(offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16))
+/*
+ * This is what we need for index tuple deletion and to
+ * update the meta page.
+ *
+ * This data record is used for XLOG_HASH_VACUUM_ONE_PAGE
+ *
+ * Backup Blk 0/1: bucket page
+ * Backup Blk 2: meta page
+ */
+typedef struct xl_hash_vacuum
+{
+ RelFileNode hnode;
+ double ntuples;
+ bool is_primary_bucket_page;
+} xl_hash_vacuum;
+
+#define SizeOfHashVacuum \
+ (offsetof(xl_hash_vacuum, is_primary_bucket_page) + sizeof(bool))
+
extern void hash_redo(XLogReaderState *record);
extern void hash_desc(StringInfo buf, XLogReaderState *record);
extern const char *hash_identify(uint8 info);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers