diff --git a/contrib/pgstattuple/pgstattuple.c b/contrib/pgstattuple/pgstattuple.c
index c1122b4..d02539a 100644
--- a/contrib/pgstattuple/pgstattuple.c
+++ b/contrib/pgstattuple/pgstattuple.c
@@ -400,7 +400,6 @@ pgstat_hash_page(pgstattuple_type *stat, Relation rel, BlockNumber blkno,
 	Buffer		buf;
 	Page		page;
 
-	_hash_getlock(rel, blkno, HASH_SHARE);
 	buf = _hash_getbuf_with_strategy(rel, blkno, HASH_READ, 0, bstrategy);
 	page = BufferGetPage(buf);
 
@@ -431,7 +430,6 @@ pgstat_hash_page(pgstattuple_type *stat, Relation rel, BlockNumber blkno,
 	}
 
 	_hash_relbuf(rel, buf);
-	_hash_droplock(rel, blkno, HASH_SHARE);
 }
 
 /*
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 49a6c81..f95ac00 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -407,12 +407,15 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
 
 	so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
 	so->hashso_bucket_valid = false;
-	so->hashso_bucket_blkno = 0;
 	so->hashso_curbuf = InvalidBuffer;
+	so->hashso_bucket_buf = InvalidBuffer;
+	so->hashso_old_bucket_buf = InvalidBuffer;
 	/* set position invalid (this will cause _hash_first call) */
 	ItemPointerSetInvalid(&(so->hashso_curpos));
 	ItemPointerSetInvalid(&(so->hashso_heappos));
 
+	so->hashso_skip_moved_tuples = false;
+
 	scan->opaque = so;
 
 	/* register scan in case we change pages it's using */
@@ -436,10 +439,15 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		_hash_dropbuf(rel, so->hashso_curbuf);
 	so->hashso_curbuf = InvalidBuffer;
 
-	/* release lock on bucket, too */
-	if (so->hashso_bucket_blkno)
-		_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
-	so->hashso_bucket_blkno = 0;
+	/* release pin we hold on old primary bucket */
+	if (BufferIsValid(so->hashso_old_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+	so->hashso_old_bucket_buf = InvalidBuffer;
+
+	/* release pin we hold on primary bucket */
+	if (BufferIsValid(so->hashso_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_bucket_buf);
+	so->hashso_bucket_buf = InvalidBuffer;
 
 	/* set position invalid (this will cause _hash_first call) */
 	ItemPointerSetInvalid(&(so->hashso_curpos));
@@ -453,6 +461,8 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 				scan->numberOfKeys * sizeof(ScanKeyData));
 		so->hashso_bucket_valid = false;
 	}
+
+	so->hashso_skip_moved_tuples = false;
 }
 
 /*
@@ -472,10 +482,15 @@ hashendscan(IndexScanDesc scan)
 		_hash_dropbuf(rel, so->hashso_curbuf);
 	so->hashso_curbuf = InvalidBuffer;
 
-	/* release lock on bucket, too */
-	if (so->hashso_bucket_blkno)
-		_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
-	so->hashso_bucket_blkno = 0;
+	/* release pin we hold on old primary bucket */
+	if (BufferIsValid(so->hashso_old_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+	so->hashso_old_bucket_buf = InvalidBuffer;
+
+	/* release pin we hold on primary bucket */
+	if (BufferIsValid(so->hashso_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_bucket_buf);
+	so->hashso_bucket_buf = InvalidBuffer;
 
 	pfree(so);
 	scan->opaque = NULL;
@@ -486,6 +501,9 @@ hashendscan(IndexScanDesc scan)
  * The set of target tuples is specified via a callback routine that tells
  * whether any given heap tuple (identified by ItemPointer) is being deleted.
  *
+ * This function also delete the tuples that are moved by split to other
+ * bucket.
+ *
  * Result: a palloc'd struct containing statistical info for VACUUM displays.
  */
 IndexBulkDeleteResult *
@@ -530,35 +548,61 @@ loop_top:
 	{
 		BlockNumber bucket_blkno;
 		BlockNumber blkno;
+		Buffer		bucket_buf;
+		Buffer		buf;
+		HashPageOpaque bucket_opaque;
+		Page		page;
 		bool		bucket_dirty = false;
+		bool		bucket_has_garbage = false;
 
 		/* Get address of bucket's start page */
 		bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
 
-		/* Exclusive-lock the bucket so we can shrink it */
-		_hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
-
 		/* Shouldn't have any active scans locally, either */
 		if (_hash_has_active_scan(rel, cur_bucket))
 			elog(ERROR, "hash index has active scan during VACUUM");
 
-		/* Scan each page in bucket */
 		blkno = bucket_blkno;
-		while (BlockNumberIsValid(blkno))
+
+		/*
+		 * Maintain a cleanup lock on primary bucket till we scan all the
+		 * pages in bucket.  This is required to ensure that we don't delete
+		 * tuples which are needed for concurrent scans on buckets where split
+		 * is in progress.  Retaining it till end of bucket scan ensures that
+		 * concurrent split can't be started on it.  In future, we might want
+		 * to relax the requirement for vacuum to take cleanup lock only for
+		 * buckets where split is in progress, however for squeeze phase we
+		 * need a cleanup lock, otherwise squeeze will move the tuples to a
+		 * different location and that can lead to change in order of results.
+		 */
+		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+		LockBufferForCleanup(buf);
+		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
+
+		page = BufferGetPage(buf);
+		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+		/*
+		 * If the bucket contains tuples that are moved by split, then we need
+		 * to delete such tuples as well.
+		 */
+		if (bucket_opaque->hasho_flag & LH_BUCKET_PAGE_HAS_GARBAGE)
+			bucket_has_garbage = true;
+
+		bucket_buf = buf;
+
+		/* Scan each page in bucket */
+		for (;;)
 		{
-			Buffer		buf;
-			Page		page;
 			HashPageOpaque opaque;
 			OffsetNumber offno;
 			OffsetNumber maxoffno;
 			OffsetNumber deletable[MaxOffsetNumber];
 			int			ndeletable = 0;
+			bool		release_buf = false;
 
 			vacuum_delay_point();
 
-			buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
-										   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
-											 info->strategy);
 			page = BufferGetPage(buf);
 			opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 			Assert(opaque->hasho_bucket == cur_bucket);
@@ -571,6 +615,7 @@ loop_top:
 			{
 				IndexTuple	itup;
 				ItemPointer htup;
+				Bucket		bucket;
 
 				itup = (IndexTuple) PageGetItem(page,
 												PageGetItemId(page, offno));
@@ -581,32 +626,72 @@ loop_top:
 					deletable[ndeletable++] = offno;
 					tuples_removed += 1;
 				}
+				else if (bucket_has_garbage)
+				{
+					/* delete the tuples that are moved by split. */
+					bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
+											  local_metapage.hashm_maxbucket,
+											   local_metapage.hashm_highmask,
+											   local_metapage.hashm_lowmask);
+					if (bucket != cur_bucket)
+					{
+						/* mark the item for deletion */
+						deletable[ndeletable++] = offno;
+						tuples_removed += 1;
+					}
+				}
 				else
 					num_index_tuples += 1;
 			}
 
 			/*
-			 * Apply deletions and write page if needed, advance to next page.
+			 * We don't release the lock on primary bucket till end of bucket
+			 * scan.
 			 */
+			if (blkno != bucket_blkno)
+				release_buf = true;
+
 			blkno = opaque->hasho_nextblkno;
 
+			/*
+			 * Apply deletions and write page if needed, advance to next page.
+			 */
 			if (ndeletable > 0)
 			{
 				PageIndexMultiDelete(page, deletable, ndeletable);
-				_hash_wrtbuf(rel, buf);
+				if (release_buf)
+					_hash_wrtbuf(rel, buf);
+				else
+					MarkBufferDirty(buf);
 				bucket_dirty = true;
 			}
-			else
+			else if (release_buf)
 				_hash_relbuf(rel, buf);
+
+			/* bail out if there are no more pages to scan. */
+			if (!BlockNumberIsValid(blkno))
+				break;
+
+			buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
+											 LH_OVERFLOW_PAGE,
+											 info->strategy);
 		}
 
+		/*
+		 * Clear the garbage flag from bucket after deleting the tuples that
+		 * are moved by split.  We purposefully clear the flag before squeeze
+		 * bucket, so that after restart, vacuum shouldn't again try to delete
+		 * the moved by split tuples.
+		 */
+		if (bucket_has_garbage)
+			bucket_opaque->hasho_flag &= ~LH_BUCKET_PAGE_HAS_GARBAGE;
+
 		/* If we deleted anything, try to compact free space */
 		if (bucket_dirty)
-			_hash_squeezebucket(rel, cur_bucket, bucket_blkno,
+			_hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
 								info->strategy);
 
-		/* Release bucket lock */
-		_hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
+		_hash_relbuf(rel, bucket_buf);
 
 		/* Advance to next bucket */
 		cur_bucket++;
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index acd2e64..eedf6ae 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -32,8 +32,6 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 	Buffer		metabuf;
 	HashMetaPage metap;
 	BlockNumber blkno;
-	BlockNumber oldblkno = InvalidBlockNumber;
-	bool		retry = false;
 	Page		page;
 	HashPageOpaque pageopaque;
 	Size		itemsz;
@@ -70,45 +68,22 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 			errhint("Values larger than a buffer page cannot be indexed.")));
 
 	/*
-	 * Loop until we get a lock on the correct target bucket.
+	 * Compute the target bucket number, and convert to block number.
 	 */
-	for (;;)
-	{
-		/*
-		 * Compute the target bucket number, and convert to block number.
-		 */
-		bucket = _hash_hashkey2bucket(hashkey,
+	bucket = _hash_hashkey2bucket(hashkey,
 									  metap->hashm_maxbucket,
 									  metap->hashm_highmask,
 									  metap->hashm_lowmask);
 
-		blkno = BUCKET_TO_BLKNO(metap, bucket);
-
-		/* Release metapage lock, but keep pin. */
-		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+	blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-		/*
-		 * If the previous iteration of this loop locked what is still the
-		 * correct target bucket, we are done.  Otherwise, drop any old lock
-		 * and lock what now appears to be the correct bucket.
-		 */
-		if (retry)
-		{
-			if (oldblkno == blkno)
-				break;
-			_hash_droplock(rel, oldblkno, HASH_SHARE);
-		}
-		_hash_getlock(rel, blkno, HASH_SHARE);
-
-		/*
-		 * Reacquire metapage lock and check that no bucket split has taken
-		 * place while we were awaiting the bucket lock.
-		 */
-		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
-		oldblkno = blkno;
-		retry = true;
-	}
+	_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
 
+	/*
+	 * FixMe: If the split operation happens during insertion and it
+	 * doesn't account the tuple being inserted, then it can be lost
+	 * for future searches.
+	 */
 	/* Fetch the primary bucket page for the bucket */
 	buf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BUCKET_PAGE);
 	page = BufferGetPage(buf);
@@ -141,10 +116,10 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 			 */
 
 			/* release our write lock without modifying buffer */
-			_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
+			_hash_chgbufaccess(rel, buf, HASH_WRITE, HASH_NOLOCK);
 
 			/* chain to a new overflow page */
-			buf = _hash_addovflpage(rel, metabuf, buf);
+			buf = _hash_addovflpage(rel, metabuf, buf, false);
 			page = BufferGetPage(buf);
 
 			/* should fit now, given test above */
@@ -161,9 +136,6 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 	/* write and release the modified page */
 	_hash_wrtbuf(rel, buf);
 
-	/* We can drop the bucket lock now */
-	_hash_droplock(rel, blkno, HASH_SHARE);
-
 	/*
 	 * Write-lock the metapage so we can increment the tuple count. After
 	 * incrementing it, check to see if it's time for a split.
diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c
index db3e268..184236c 100644
--- a/src/backend/access/hash/hashovfl.c
+++ b/src/backend/access/hash/hashovfl.c
@@ -82,23 +82,20 @@ blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
  *
  *	On entry, the caller must hold a pin but no lock on 'buf'.  The pin is
  *	dropped before exiting (we assume the caller is not interested in 'buf'
- *	anymore).  The returned overflow page will be pinned and write-locked;
- *	it is guaranteed to be empty.
+ *	anymore) if not asked to retain.  The pin will be retained only for the
+ *	primary bucket.  The returned overflow page will be pinned and
+ *	write-locked; it is guaranteed to be empty.
  *
  *	The caller must hold a pin, but no lock, on the metapage buffer.
  *	That buffer is returned in the same state.
  *
- *	The caller must hold at least share lock on the bucket, to ensure that
- *	no one else tries to compact the bucket meanwhile.  This guarantees that
- *	'buf' won't stop being part of the bucket while it's unlocked.
- *
  * NB: since this could be executed concurrently by multiple processes,
  * one should not assume that the returned overflow page will be the
  * immediate successor of the originally passed 'buf'.  Additional overflow
  * pages might have been added to the bucket chain in between.
  */
 Buffer
-_hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
+_hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
 {
 	Buffer		ovflbuf;
 	Page		page;
@@ -131,7 +128,10 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
 			break;
 
 		/* we assume we do not need to write the unmodified page */
-		_hash_relbuf(rel, buf);
+		if ((pageopaque->hasho_flag & LH_BUCKET_PAGE) && retain_pin)
+			_hash_chgbufaccess(rel, buf, HASH_WRITE, HASH_NOLOCK);
+		else
+			_hash_relbuf(rel, buf);
 
 		buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
 	}
@@ -149,7 +149,13 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
 
 	/* logically chain overflow page to previous page */
 	pageopaque->hasho_nextblkno = BufferGetBlockNumber(ovflbuf);
-	_hash_wrtbuf(rel, buf);
+	if ((pageopaque->hasho_flag & LH_BUCKET_PAGE) && retain_pin)
+	{
+		MarkBufferDirty(buf);
+		_hash_chgbufaccess(rel, buf, HASH_WRITE, HASH_NOLOCK);
+	}
+	else
+		_hash_wrtbuf(rel, buf);
 
 	return ovflbuf;
 }
@@ -570,7 +576,7 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno,
  *	required that to be true on entry as well, but it's a lot easier for
  *	callers to leave empty overflow pages and let this guy clean it up.
  *
- *	Caller must hold exclusive lock on the target bucket.  This allows
+ *	Caller must hold cleanup lock on the target bucket.  This allows
  *	us to safely lock multiple pages in the bucket.
  *
  *	Since this function is invoked in VACUUM, we provide an access strategy
@@ -580,6 +586,7 @@ void
 _hash_squeezebucket(Relation rel,
 					Bucket bucket,
 					BlockNumber bucket_blkno,
+					Buffer bucket_buf,
 					BufferAccessStrategy bstrategy)
 {
 	BlockNumber wblkno;
@@ -591,16 +598,13 @@ _hash_squeezebucket(Relation rel,
 	HashPageOpaque wopaque;
 	HashPageOpaque ropaque;
 	bool		wbuf_dirty;
+	bool		release_buf = false;
 
 	/*
 	 * start squeezing into the base bucket page.
 	 */
 	wblkno = bucket_blkno;
-	wbuf = _hash_getbuf_with_strategy(rel,
-									  wblkno,
-									  HASH_WRITE,
-									  LH_BUCKET_PAGE,
-									  bstrategy);
+	wbuf = bucket_buf;
 	wpage = BufferGetPage(wbuf);
 	wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
 
@@ -669,12 +673,17 @@ _hash_squeezebucket(Relation rel,
 			{
 				Assert(!PageIsEmpty(wpage));
 
+				if (wblkno != bucket_blkno)
+					release_buf = true;
+
 				wblkno = wopaque->hasho_nextblkno;
 				Assert(BlockNumberIsValid(wblkno));
 
-				if (wbuf_dirty)
+				if (wbuf_dirty && release_buf)
 					_hash_wrtbuf(rel, wbuf);
-				else
+				else if (wbuf_dirty)
+					MarkBufferDirty(wbuf);
+				else if (release_buf)
 					_hash_relbuf(rel, wbuf);
 
 				/* nothing more to do if we reached the read page */
@@ -700,6 +709,7 @@ _hash_squeezebucket(Relation rel,
 				wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
 				Assert(wopaque->hasho_bucket == bucket);
 				wbuf_dirty = false;
+				release_buf = false;
 			}
 
 			/*
@@ -733,11 +743,17 @@ _hash_squeezebucket(Relation rel,
 		/* are we freeing the page adjacent to wbuf? */
 		if (rblkno == wblkno)
 		{
-			/* yes, so release wbuf lock first */
-			if (wbuf_dirty)
+			if (wblkno != bucket_blkno)
+				release_buf = true;
+
+			/* yes, so release wbuf lock first if needed */
+			if (wbuf_dirty && release_buf)
 				_hash_wrtbuf(rel, wbuf);
-			else
+			else if (wbuf_dirty)
+				MarkBufferDirty(wbuf);
+			else if (release_buf)
 				_hash_relbuf(rel, wbuf);
+
 			/* free this overflow page (releases rbuf) */
 			_hash_freeovflpage(rel, rbuf, bstrategy);
 			/* done */
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 178463f..1ba4d52 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -38,7 +38,7 @@ static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock,
 					uint32 nblocks);
 static void _hash_splitbucket(Relation rel, Buffer metabuf,
 				  Bucket obucket, Bucket nbucket,
-				  BlockNumber start_oblkno,
+				  Buffer obuf,
 				  Buffer nbuf,
 				  uint32 maxbucket,
 				  uint32 highmask, uint32 lowmask);
@@ -55,46 +55,6 @@ static void _hash_splitbucket(Relation rel, Buffer metabuf,
 
 
 /*
- * _hash_getlock() -- Acquire an lmgr lock.
- *
- * 'whichlock' should the block number of a bucket's primary bucket page to
- * acquire the per-bucket lock.  (See README for details of the use of these
- * locks.)
- *
- * 'access' must be HASH_SHARE or HASH_EXCLUSIVE.
- */
-void
-_hash_getlock(Relation rel, BlockNumber whichlock, int access)
-{
-	if (USELOCKING(rel))
-		LockPage(rel, whichlock, access);
-}
-
-/*
- * _hash_try_getlock() -- Acquire an lmgr lock, but only if it's free.
- *
- * Same as above except we return FALSE without blocking if lock isn't free.
- */
-bool
-_hash_try_getlock(Relation rel, BlockNumber whichlock, int access)
-{
-	if (USELOCKING(rel))
-		return ConditionalLockPage(rel, whichlock, access);
-	else
-		return true;
-}
-
-/*
- * _hash_droplock() -- Release an lmgr lock.
- */
-void
-_hash_droplock(Relation rel, BlockNumber whichlock, int access)
-{
-	if (USELOCKING(rel))
-		UnlockPage(rel, whichlock, access);
-}
-
-/*
  *	_hash_getbuf() -- Get a buffer by block number for read or write.
  *
  *		'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
@@ -489,9 +449,8 @@ _hash_pageinit(Page page, Size size)
 /*
  * Attempt to expand the hash table by creating one new bucket.
  *
- * This will silently do nothing if it cannot get the needed locks.
- *
- * The caller should hold no locks on the hash index.
+ * This will silently do nothing if there are active scans of our own
+ * backend or the old bucket contains tuples from previous split.
  *
  * The caller must hold a pin, but no lock, on the metapage buffer.
  * The buffer is returned in the same state.
@@ -506,6 +465,9 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	BlockNumber start_oblkno;
 	BlockNumber start_nblkno;
 	Buffer		buf_nblkno;
+	Buffer		buf_oblkno;
+	Page		opage;
+	HashPageOpaque oopaque;
 	uint32		maxbucket;
 	uint32		highmask;
 	uint32		lowmask;
@@ -548,11 +510,9 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 		goto fail;
 
 	/*
-	 * Determine which bucket is to be split, and attempt to lock the old
-	 * bucket.  If we can't get the lock, give up.
-	 *
-	 * The lock protects us against other backends, but not against our own
-	 * backend.  Must check for active scans separately.
+	 * Determine which bucket is to be split, and if it still contains tuples
+	 * from previous split or there is any active scan of our own backend,
+	 * then give up.
 	 */
 	new_bucket = metap->hashm_maxbucket + 1;
 
@@ -563,11 +523,18 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	if (_hash_has_active_scan(rel, old_bucket))
 		goto fail;
 
-	if (!_hash_try_getlock(rel, start_oblkno, HASH_EXCLUSIVE))
+	buf_oblkno = _hash_getbuf(rel, start_oblkno, HASH_READ, LH_BUCKET_PAGE);
+
+	opage = BufferGetPage(buf_oblkno);
+	oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+	if (oopaque->hasho_flag & LH_BUCKET_PAGE_HAS_GARBAGE)
+	{
+		_hash_relbuf(rel, buf_oblkno);
 		goto fail;
+	}
 
 	/*
-	 * Likewise lock the new bucket (should never fail).
+	 * There shouldn't be any active scan on new bucket.
 	 *
 	 * Note: it is safe to compute the new bucket's blkno here, even though we
 	 * may still need to update the BUCKET_TO_BLKNO mapping.  This is because
@@ -579,9 +546,6 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	if (_hash_has_active_scan(rel, new_bucket))
 		elog(ERROR, "scan in progress on supposedly new bucket");
 
-	if (!_hash_try_getlock(rel, start_nblkno, HASH_EXCLUSIVE))
-		elog(ERROR, "could not get lock on supposedly new bucket");
-
 	/*
 	 * If the split point is increasing (hashm_maxbucket's log base 2
 	 * increases), we need to allocate a new batch of bucket pages.
@@ -600,8 +564,7 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 		if (!_hash_alloc_buckets(rel, start_nblkno, new_bucket))
 		{
 			/* can't split due to BlockNumber overflow */
-			_hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
-			_hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
+			_hash_relbuf(rel, buf_oblkno);
 			goto fail;
 		}
 	}
@@ -665,13 +628,9 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	/* Relocate records to the new bucket */
 	_hash_splitbucket(rel, metabuf,
 					  old_bucket, new_bucket,
-					  start_oblkno, buf_nblkno,
+					  buf_oblkno, buf_nblkno,
 					  maxbucket, highmask, lowmask);
 
-	/* Release bucket locks, allowing others to access them */
-	_hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
-	_hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
-
 	return;
 
 	/* Here if decide not to split or fail to acquire old bucket lock */
@@ -745,6 +704,10 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
  * The buffer is returned in the same state.  (The metapage is only
  * touched if it becomes necessary to add or remove overflow pages.)
  *
+ * Split needs to hold pin on primary bucket pages of both old and new
+ * buckets till end of operation.  This is to prevent vacuum to start
+ * when split is in progress.
+ *
  * In addition, the caller must have created the new bucket's base page,
  * which is passed in buffer nbuf, pinned and write-locked.  That lock and
  * pin are released here.  (The API is set up this way because we must do
@@ -756,37 +719,46 @@ _hash_splitbucket(Relation rel,
 				  Buffer metabuf,
 				  Bucket obucket,
 				  Bucket nbucket,
-				  BlockNumber start_oblkno,
+				  Buffer obuf,
 				  Buffer nbuf,
 				  uint32 maxbucket,
 				  uint32 highmask,
 				  uint32 lowmask)
 {
-	Buffer		obuf;
+	Buffer		bucket_obuf;
+	Buffer		bucket_nbuf;
 	Page		opage;
 	Page		npage;
 	HashPageOpaque oopaque;
 	HashPageOpaque nopaque;
+	HashPageOpaque bucket_nopaque;
 
-	/*
-	 * It should be okay to simultaneously write-lock pages from each bucket,
-	 * since no one else can be trying to acquire buffer lock on pages of
-	 * either bucket.
-	 */
-	obuf = _hash_getbuf(rel, start_oblkno, HASH_WRITE, LH_BUCKET_PAGE);
+	bucket_nbuf = nbuf;
+	bucket_obuf = obuf;
 	opage = BufferGetPage(obuf);
 	oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
 
+	/*
+	 * Mark the old bucket to indicate that it has deletable tuples. Vacuum
+	 * will clear this flag after deleting such tuples.
+	 */
+	oopaque->hasho_flag |= LH_BUCKET_PAGE_HAS_GARBAGE;
+
 	npage = BufferGetPage(nbuf);
 
-	/* initialize the new bucket's primary page */
+	/*
+	 * initialize the new bucket's primary page and mark it to indicate that
+	 * split is in progress.
+	 */
 	nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
 	nopaque->hasho_prevblkno = InvalidBlockNumber;
 	nopaque->hasho_nextblkno = InvalidBlockNumber;
 	nopaque->hasho_bucket = nbucket;
-	nopaque->hasho_flag = LH_BUCKET_PAGE;
+	nopaque->hasho_flag = LH_BUCKET_PAGE | LH_BUCKET_PAGE_SPLIT;
 	nopaque->hasho_page_id = HASHO_PAGE_ID;
 
+	bucket_nopaque = nopaque;
+
 	/*
 	 * Partition the tuples in the old bucket between the old bucket and the
 	 * new bucket, advancing along the old bucket's overflow bucket chain and
@@ -798,8 +770,6 @@ _hash_splitbucket(Relation rel,
 		BlockNumber oblkno;
 		OffsetNumber ooffnum;
 		OffsetNumber omaxoffnum;
-		OffsetNumber deletable[MaxOffsetNumber];
-		int			ndeletable = 0;
 
 		/* Scan each tuple in old page */
 		omaxoffnum = PageGetMaxOffsetNumber(opage);
@@ -822,6 +792,18 @@ _hash_splitbucket(Relation rel,
 
 			if (bucket == nbucket)
 			{
+				Size		itupsize = 0;
+
+				/*
+				 * mark the index tuple as moved by split, such tuples are
+				 * skipped by scan if there is split in progress for a primary
+				 * bucket.
+				 */
+				itupsize = itup->t_info & INDEX_SIZE_MASK;
+				itup->t_info &= ~INDEX_SIZE_MASK;
+				itup->t_info |= INDEX_MOVED_BY_SPLIT_MASK;
+				itup->t_info |= itupsize;
+
 				/*
 				 * insert the tuple into the new bucket.  if it doesn't fit on
 				 * the current page in the new bucket, we must allocate a new
@@ -840,9 +822,10 @@ _hash_splitbucket(Relation rel,
 					/* write out nbuf and drop lock, but keep pin */
 					_hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
 					/* chain to a new overflow page */
-					nbuf = _hash_addovflpage(rel, metabuf, nbuf);
+					nbuf = _hash_addovflpage(rel, metabuf, nbuf,
+						nopaque->hasho_flag & LH_BUCKET_PAGE ? true : false);
 					npage = BufferGetPage(nbuf);
-					/* we don't need nopaque within the loop */
+					nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
 				}
 
 				/*
@@ -853,11 +836,6 @@ _hash_splitbucket(Relation rel,
 				 * the new page and qsort them before insertion.
 				 */
 				(void) _hash_pgaddtup(rel, nbuf, itemsz, itup);
-
-				/*
-				 * Mark tuple for deletion from old page.
-				 */
-				deletable[ndeletable++] = ooffnum;
 			}
 			else
 			{
@@ -870,15 +848,9 @@ _hash_splitbucket(Relation rel,
 
 		oblkno = oopaque->hasho_nextblkno;
 
-		/*
-		 * Done scanning this old page.  If we moved any tuples, delete them
-		 * from the old page.
-		 */
-		if (ndeletable > 0)
-		{
-			PageIndexMultiDelete(opage, deletable, ndeletable);
-			_hash_wrtbuf(rel, obuf);
-		}
+		/* retain the pin on the old primary bucket */
+		if (obuf == bucket_obuf)
+			_hash_chgbufaccess(rel, obuf, HASH_READ, HASH_NOLOCK);
 		else
 			_hash_relbuf(rel, obuf);
 
@@ -887,18 +859,24 @@ _hash_splitbucket(Relation rel,
 			break;
 
 		/* Else, advance to next old page */
-		obuf = _hash_getbuf(rel, oblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
+		obuf = _hash_getbuf(rel, oblkno, HASH_READ, LH_OVERFLOW_PAGE);
 		opage = BufferGetPage(obuf);
 		oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
 	}
 
+	/* indicate that split is finished */
+	bucket_nopaque->hasho_flag &= ~LH_BUCKET_PAGE_SPLIT;
+
+	/* release the pin on the old primary bucket */
+	_hash_dropbuf(rel, bucket_obuf);
+
 	/*
 	 * We're at the end of the old bucket chain, so we're done partitioning
-	 * the tuples.  Before quitting, call _hash_squeezebucket to ensure the
-	 * tuples remaining in the old bucket (including the overflow pages) are
-	 * packed as tightly as possible.  The new bucket is already tight.
+	 * the tuples.
 	 */
 	_hash_wrtbuf(rel, nbuf);
 
-	_hash_squeezebucket(rel, obucket, start_oblkno, NULL);
+	/* release the pin on the new primary bucket */
+	if (!(nopaque->hasho_flag & LH_BUCKET_PAGE))
+		_hash_dropbuf(rel, bucket_nbuf);
 }
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index 4825558..b73559a 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -20,6 +20,7 @@
 #include "pgstat.h"
 #include "utils/rel.h"
 
+static BlockNumber _hash_get_oldblk(Relation rel, HashPageOpaque opaque);
 
 /*
  *	_hash_next() -- Get the next item in a scan.
@@ -72,7 +73,23 @@ _hash_readnext(Relation rel,
 	BlockNumber blkno;
 
 	blkno = (*opaquep)->hasho_nextblkno;
-	_hash_relbuf(rel, *bufp);
+
+	/*
+	 * Retain the pin on primary bucket page till the end of scan to ensure
+	 * that Vacuum can't delete the tuples (that are moved by split to new
+	 * bucket) which are required by the scans that are started on splitted
+	 * buckets before a new bucket's split in progress flag
+	 * (LH_BUCKET_PAGE_SPLIT) is cleared.  Now the requirement to retain a pin
+	 * on primary bucket can be relaxed for buckets that are not splitted by
+	 * maintaining a flag like has_garbage in bucket but still we need to
+	 * retain pin for squeeze phase otherwise the movement of tuples could
+	 * lead to change the ordering of scan results.
+	 */
+	if ((*opaquep)->hasho_flag & LH_BUCKET_PAGE)
+		_hash_chgbufaccess(rel, *bufp, HASH_READ, HASH_NOLOCK);
+	else
+		_hash_relbuf(rel, *bufp);
+
 	*bufp = InvalidBuffer;
 	/* check for interrupts while we're not holding any buffer lock */
 	CHECK_FOR_INTERRUPTS();
@@ -94,7 +111,23 @@ _hash_readprev(Relation rel,
 	BlockNumber blkno;
 
 	blkno = (*opaquep)->hasho_prevblkno;
-	_hash_relbuf(rel, *bufp);
+
+	/*
+	 * Retain the pin on primary bucket page till the end of scan to ensure
+	 * that Vacuum can't delete the tuples (that are moved by split to new
+	 * bucket) which are required by the scans that are started on splitted
+	 * buckets before a new bucket's split in progress flag
+	 * (LH_BUCKET_PAGE_SPLIT) is cleared.  Now the requirement to retain a pin
+	 * on primary bucket can be relaxed for buckets that are not splitted by
+	 * maintaining a flag like has_garbage in bucket but still we need to
+	 * retain pin for squeeze phase otherwise the movemenet of tuples could
+	 * lead to change the ordering of scan results.
+	 */
+	if ((*opaquep)->hasho_flag & LH_BUCKET_PAGE)
+		_hash_chgbufaccess(rel, *bufp, HASH_READ, HASH_NOLOCK);
+	else
+		_hash_relbuf(rel, *bufp);
+
 	*bufp = InvalidBuffer;
 	/* check for interrupts while we're not holding any buffer lock */
 	CHECK_FOR_INTERRUPTS();
@@ -125,8 +158,6 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 	uint32		hashkey;
 	Bucket		bucket;
 	BlockNumber blkno;
-	BlockNumber oldblkno = InvalidBuffer;
-	bool		retry = false;
 	Buffer		buf;
 	Buffer		metabuf;
 	Page		page;
@@ -192,52 +223,21 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 	metap = HashPageGetMeta(page);
 
 	/*
-	 * Loop until we get a lock on the correct target bucket.
+	 * Compute the target bucket number, and convert to block number.
 	 */
-	for (;;)
-	{
-		/*
-		 * Compute the target bucket number, and convert to block number.
-		 */
-		bucket = _hash_hashkey2bucket(hashkey,
-									  metap->hashm_maxbucket,
-									  metap->hashm_highmask,
-									  metap->hashm_lowmask);
-
-		blkno = BUCKET_TO_BLKNO(metap, bucket);
-
-		/* Release metapage lock, but keep pin. */
-		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
-
-		/*
-		 * If the previous iteration of this loop locked what is still the
-		 * correct target bucket, we are done.  Otherwise, drop any old lock
-		 * and lock what now appears to be the correct bucket.
-		 */
-		if (retry)
-		{
-			if (oldblkno == blkno)
-				break;
-			_hash_droplock(rel, oldblkno, HASH_SHARE);
-		}
-		_hash_getlock(rel, blkno, HASH_SHARE);
+	bucket = _hash_hashkey2bucket(hashkey,
+								  metap->hashm_maxbucket,
+								  metap->hashm_highmask,
+								  metap->hashm_lowmask);
 
-		/*
-		 * Reacquire metapage lock and check that no bucket split has taken
-		 * place while we were awaiting the bucket lock.
-		 */
-		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
-		oldblkno = blkno;
-		retry = true;
-	}
+	blkno = BUCKET_TO_BLKNO(metap, bucket);
 
 	/* done with the metapage */
-	_hash_dropbuf(rel, metabuf);
+	_hash_relbuf(rel, metabuf);
 
 	/* Update scan opaque state to show we have lock on the bucket */
 	so->hashso_bucket = bucket;
 	so->hashso_bucket_valid = true;
-	so->hashso_bucket_blkno = blkno;
 
 	/* Fetch the primary bucket page for the bucket */
 	buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
@@ -245,6 +245,54 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 	Assert(opaque->hasho_bucket == bucket);
 
+	so->hashso_bucket_buf = buf;
+
+	/*
+	 * If the bucket split is in progress, then we need to skip tuples that
+	 * are moved from old bucket.  To ensure that vacuum doesn't clean any
+	 * tuples from old or new buckets till this scan is in progress, maintain
+	 * a pin on both of the buckets.  Here, we have to be cautious about lock
+	 * ordering, first acquire the lock on old bucket, release the lock on old
+	 * bucket, but not pin, then acuire the lock on new bucket and again
+	 * re-verify whether the bucket split still is in progress. Acquiring lock
+	 * on old bucket first ensures that the vacuum waits for this scan to
+	 * finish.
+	 */
+	if (opaque->hasho_flag & LH_BUCKET_PAGE_SPLIT)
+	{
+		BlockNumber old_blkno;
+		Buffer		old_buf;
+
+		old_blkno = _hash_get_oldblk(rel, opaque);
+
+		/*
+		 * release the lock on new bucket and re-acquire it after acquiring
+		 * the lock on old bucket.
+		 */
+		_hash_relbuf(rel, buf);
+
+		old_buf = _hash_getbuf(rel, old_blkno, HASH_READ, LH_BUCKET_PAGE);
+
+		/*
+		 * remember the old bucket buffer so as to release it later.
+		 */
+		so->hashso_old_bucket_buf = buf;
+		_hash_chgbufaccess(rel, old_buf, HASH_READ, HASH_NOLOCK);
+
+		buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
+		page = BufferGetPage(buf);
+		opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+		Assert(opaque->hasho_bucket == bucket);
+
+		if (opaque->hasho_flag & LH_BUCKET_PAGE_SPLIT)
+			so->hashso_skip_moved_tuples = true;
+		else
+		{
+			_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+			so->hashso_old_bucket_buf = InvalidBuffer;
+		}
+	}
+
 	/* If a backwards scan is requested, move to the end of the chain */
 	if (ScanDirectionIsBackward(dir))
 	{
@@ -273,6 +321,13 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
  *		false.  Else, return true and set the hashso_curpos for the
  *		scan to the right thing.
  *
+ *		Here we also scan the old bucket if the split for current bucket
+ *		was in progress at the start of scan.  The basic idea is that
+ *		skip the tuples that are moved by split while scanning current
+ *		bucket and then scan the old bucket to cover all such tuples. This
+ *		is done ensure that we don't miss any tuples in the current scan
+ *		when split was in progress.
+ *
  *		'bufP' points to the current buffer, which is pinned and read-locked.
  *		On success exit, we have pin and read-lock on whichever page
  *		contains the right item; on failure, we have released all buffers.
@@ -338,6 +393,16 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					{
 						Assert(offnum >= FirstOffsetNumber);
 						itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+
+						/*
+						 * skip the tuples that are moved by split operation
+						 * for the scan that has started when split was in
+						 * progress
+						 */
+						if (so->hashso_skip_moved_tuples &&
+							(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK))
+							continue;
+
 						if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
 							break;		/* yes, so exit for-loop */
 					}
@@ -353,9 +418,52 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					}
 					else
 					{
-						/* end of bucket */
-						itup = NULL;
-						break;	/* exit for-loop */
+						/*
+						 * end of bucket, scan old bucket if there was a split
+						 * in progress at the start of scan.
+						 */
+						if (so->hashso_skip_moved_tuples)
+						{
+							page = BufferGetPage(so->hashso_bucket_buf);
+							opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+							blkno = _hash_get_oldblk(rel, opaque);
+
+							Assert(BlockNumberIsValid(blkno));
+							buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
+
+							/*
+							 * remember the old bucket buffer so as to release
+							 * the pin at end of scan.  If this scan already
+							 * has a pin on old buffer, then release it as one
+							 * pin is sufficient to hold-off vacuum to clean
+							 * the bucket where scan is in progress.
+							 */
+							if (BufferIsValid(so->hashso_old_bucket_buf))
+							{
+								_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+								so->hashso_old_bucket_buf = InvalidBuffer;
+							}
+							so->hashso_old_bucket_buf = buf;
+
+							page = BufferGetPage(buf);
+							maxoff = PageGetMaxOffsetNumber(page);
+							offnum = _hash_binsearch(page, so->hashso_sk_hash);
+
+							/*
+							 * setting hashso_skip_moved_tuples to false
+							 * ensures that we don't check for tuples that are
+							 * moved by split in old bucket and it also
+							 * ensures that we won't retry to scan the old
+							 * bucket once the scan for same is finished.
+							 */
+							so->hashso_skip_moved_tuples = false;
+						}
+						else
+						{
+							itup = NULL;
+							break;		/* exit for-loop */
+						}
 					}
 				}
 				break;
@@ -379,6 +487,16 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					{
 						Assert(offnum <= maxoff);
 						itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+
+						/*
+						 * skip the tuples that are moved by split operation
+						 * for the scan that has started when split was in
+						 * progress
+						 */
+						if (so->hashso_skip_moved_tuples &&
+							(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK))
+							continue;
+
 						if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
 							break;		/* yes, so exit for-loop */
 					}
@@ -394,9 +512,64 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					}
 					else
 					{
-						/* end of bucket */
-						itup = NULL;
-						break;	/* exit for-loop */
+						/*
+						 * end of bucket, scan old bucket if there was a split
+						 * in progress at the start of scan.
+						 */
+						if (so->hashso_skip_moved_tuples)
+						{
+							page = BufferGetPage(so->hashso_bucket_buf);
+							opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+							blkno = _hash_get_oldblk(rel, opaque);
+
+							/* read the old page */
+							Assert(BlockNumberIsValid(blkno));
+							buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
+
+							/*
+							 * remember the old bucket buffer so as to release
+							 * the pin at end of scan.  If this scan already
+							 * has a pin on old buffer, then release it as one
+							 * pin is sufficient to hold-off vacuum to clean
+							 * the bucket where scan is in progress.
+							 */
+							if (BufferIsValid(so->hashso_old_bucket_buf))
+							{
+								_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+								so->hashso_old_bucket_buf = InvalidBuffer;
+							}
+							so->hashso_old_bucket_buf = buf;
+
+							page = BufferGetPage(buf);
+
+							opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+							/*
+							 * For backward scan, we need to start scan from
+							 * the last overflow page of old bucket till
+							 * primary bucket page.
+							 */
+							while (BlockNumberIsValid(opaque->hasho_nextblkno))
+								_hash_readnext(rel, &buf, &page, &opaque);
+
+							maxoff = PageGetMaxOffsetNumber(page);
+							offnum = _hash_binsearch(page, so->hashso_sk_hash);
+
+							/*
+							 * setting hashso_skip_moved_tuples to false
+							 * ensures that we don't check for tuples that are
+							 * moved by split in old bucket and it also
+							 * ensures that we won't retry to scan the old
+							 * bucket once the scan for same is finished.
+							 */
+							so->hashso_skip_moved_tuples = false;
+						}
+						else
+						{
+							itup = NULL;
+							break;		/* exit for-loop */
+						}
 					}
 				}
 				break;
@@ -425,3 +598,39 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 	ItemPointerSet(current, blkno, offnum);
 	return true;
 }
+
+/*
+ *	_hash_get_oldblk() -- get the block number from which current bucket
+ *			is being splitted.
+ */
+static BlockNumber
+_hash_get_oldblk(Relation rel, HashPageOpaque opaque)
+{
+	Bucket		curr_bucket;
+	Bucket		old_bucket;
+	uint32		mask;
+	Buffer		metabuf;
+	HashMetaPage metap;
+	BlockNumber blkno;
+
+	/*
+	 * To get the old bucket from the current bucket, we need a mask to modulo
+	 * into lower half of table.  This mask is stored in meta page as
+	 * hashm_lowmask, but here we can't rely on the same, because we need a
+	 * value of lowmask that was prevalent at the time when bucket split was
+	 * started.  lowmask is always equal to last bucket number in lower half
+	 * of the table which can be calculate from current bucket.
+	 */
+	curr_bucket = opaque->hasho_bucket;
+	mask = (((uint32) 1) << _hash_log2((uint32) curr_bucket) / 2) - 1;
+	old_bucket = curr_bucket & mask;
+
+	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
+	metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+	blkno = BUCKET_TO_BLKNO(metap, old_bucket);
+
+	_hash_relbuf(rel, metabuf);
+
+	return blkno;
+}
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index fa3f9b6..cd40ed7 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -52,6 +52,8 @@ typedef uint32 Bucket;
 #define LH_BUCKET_PAGE			(1 << 1)
 #define LH_BITMAP_PAGE			(1 << 2)
 #define LH_META_PAGE			(1 << 3)
+#define LH_BUCKET_PAGE_SPLIT	(1 << 4)
+#define LH_BUCKET_PAGE_HAS_GARBAGE	(1 << 5)
 
 typedef struct HashPageOpaqueData
 {
@@ -88,12 +90,6 @@ typedef struct HashScanOpaqueData
 	bool		hashso_bucket_valid;
 
 	/*
-	 * If we have a share lock on the bucket, we record it here.  When
-	 * hashso_bucket_blkno is zero, we have no such lock.
-	 */
-	BlockNumber hashso_bucket_blkno;
-
-	/*
 	 * We also want to remember which buffer we're currently examining in the
 	 * scan. We keep the buffer pinned (but not locked) across hashgettuple
 	 * calls, in order to avoid doing a ReadBuffer() for every tuple in the
@@ -101,11 +97,23 @@ typedef struct HashScanOpaqueData
 	 */
 	Buffer		hashso_curbuf;
 
+	/* remember the buffer associated with primary bucket */
+	Buffer		hashso_bucket_buf;
+
+	/*
+	 * remember the buffer associated with old primary bucket which is
+	 * required during the scan of the bucket for which split is in progress.
+	 */
+	Buffer		hashso_old_bucket_buf;
+
 	/* Current position of the scan, as an index TID */
 	ItemPointerData hashso_curpos;
 
 	/* Current position of the scan, as a heap TID */
 	ItemPointerData hashso_heappos;
+
+	/* Whether scan needs to skip tuples that are moved by split */
+	bool		hashso_skip_moved_tuples;
 } HashScanOpaqueData;
 
 typedef HashScanOpaqueData *HashScanOpaque;
@@ -176,6 +184,8 @@ typedef HashMetaPageData *HashMetaPage;
 				  sizeof(ItemIdData) - \
 				  MAXALIGN(sizeof(HashPageOpaqueData)))
 
+#define INDEX_MOVED_BY_SPLIT_MASK	0x2000
+
 #define HASH_MIN_FILLFACTOR			10
 #define HASH_DEFAULT_FILLFACTOR		75
 
@@ -299,19 +309,17 @@ extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
 			   Size itemsize, IndexTuple itup);
 
 /* hashovfl.c */
-extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf);
+extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin);
 extern BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf,
 				   BufferAccessStrategy bstrategy);
 extern void _hash_initbitmap(Relation rel, HashMetaPage metap,
 				 BlockNumber blkno, ForkNumber forkNum);
 extern void _hash_squeezebucket(Relation rel,
 					Bucket bucket, BlockNumber bucket_blkno,
+					Buffer bucket_buf,
 					BufferAccessStrategy bstrategy);
 
 /* hashpage.c */
-extern void _hash_getlock(Relation rel, BlockNumber whichlock, int access);
-extern bool _hash_try_getlock(Relation rel, BlockNumber whichlock, int access);
-extern void _hash_droplock(Relation rel, BlockNumber whichlock, int access);
 extern Buffer _hash_getbuf(Relation rel, BlockNumber blkno,
 			 int access, int flags);
 extern Buffer _hash_getinitbuf(Relation rel, BlockNumber blkno);
