Hello, hackers.

I think I was able to fix the issue related to minRecoveryPoint and crash
recovery. To make sure standby will be consistent after crash recovery, we
need to take the current value of minRecoveryPoint into account while
setting LP_DEAD hints (almost the same way as it is done for *heap* hint
bits already).

I have introduced new structure IndexHintBitsData:
-------
    /* guaranteed not visible for all backends */
    bool all_dead;

    /* latest removed xid if known */
    TransactionId latest_removed_xid;

     /* lsn of page where dead tuple located */
    XLogRecPtr page_lsn;
-------

This structure is filled by the `heap_hot_search_buffer` function. After,
we decide to set or not `kill_prior_tuple` depending on its content
(calling `IsMarkBufferDirtyIndexHintAllowed`).

For primary - it is always safe to set LP_DEAD in index if `all_dead` ==
true.

In the case of standby, we need to check `latest_removed_xid` (if
available) first. If commit LSN of the latest removed xid is already lower
than minRecoveryPoint (`XLogNeedsFlush`) - it is safe to set
`kill_prior_tuple`.

Sometimes we are not sure about the latest removed xid - heap record could
be marked dead by the XLOG_HEAP2_CLEAN record, for example. In such a case
we check the LSN of the *heap* page containing the tuple (LSN could be
updated by other transactions already - but it does not matter in that
situation). If page LSN is lower than minRecoveryPoint - it is safe to set
LP_DEAD in the index too. Otherwise - just leave the index tuple alive.


So, to bring it all together:

* Normal operation, proc->indexIgnoreKilledTuples is true:
      It is safe for standby to use hint bits from the primary FPI because
of XLOG_INDEX_HINT_BITS_HORIZON conflict resolution.
      It is safe for standby to set its index hint bits because
`ComputeXidHorizons` honors other read-only procs xmin and lowest xid on
primary (`KnownAssignedXidsGetOldestXmin`).

* Normal operation, proc->indexIgnoreKilledTuples is false:
      Index hint bits are never set or taken into account.

* Crash recovery, proc->indexIgnoreKilledTuples is true:
      It is safe for standby to use hint bits from the primary FPW because
XLOG_INDEX_HINT_BITS_HORIZON is always logged before FPI, and commit record
of transaction removed the tuple is logged before
XLOG_INDEX_HINT_BITS_HORIZON. So, if FPI with hints was flushed (and taken
into account by minRecoveryPoint) - both transaction-remover and horizon
records are replayed before reading queries.
      It is safe for standby to use its hint bits because they can be set
only if the commit record of transaction-remover is lower than
minRecoveryPoint or LSN of heap page with removed tuples is lower than
minRecoveryPoint.

* Crash recovery, proc->indexIgnoreKilledTuples is false:
      Index hint bits are never set or taken into account.

So, now it seems correct to me.

Another interesting point here - now position of minRecoveryPoint affects
performance a lot. It is happening already (because of *heap* hint bits)
but after the patch, it is noticeable even more. Is there any sense to keep
minRecoveryPoint at a low value?

Rebased and updated patch in attachment.

Will be happy if someone could recheck my ideas or even the code :)

Thanks a lot,
Michail.
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 96442ceb4e..6399184a8c 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -10,6 +10,7 @@
 #-------------------------------------------------------------------------
 
 EXTRA_INSTALL=contrib/test_decoding
+EXTRA_INSTALL+=contrib/pageinspect
 
 subdir = src/test/recovery
 top_builddir = ../../..
diff --git a/src/test/recovery/t/022_index_hint_bits.pl b/src/test/recovery/t/022_index_hint_bits.pl
new file mode 100644
index 0000000000..737dca0185
--- /dev/null
+++ b/src/test/recovery/t/022_index_hint_bits.pl
@@ -0,0 +1,282 @@
+# Checks that index hints on standby work as excepted.
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 15;
+use Config;
+
+# Initialize primary node
+my $node_primary = get_new_node('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->append_conf('postgresql.conf', qq{
+    autovacuum = off
+    enable_seqscan = off
+    enable_indexonlyscan = off
+});
+$node_primary->start;
+
+$node_primary->safe_psql('postgres', 'CREATE EXTENSION pageinspect');
+# Create test table with primary index
+$node_primary->safe_psql(
+    'postgres', 'CREATE TABLE test_index_hint (id int, value int)');
+$node_primary->safe_psql(
+    'postgres', 'CREATE INDEX test_index ON test_index_hint (value, id)');
+# Fill some data to it, note to not put a lot of records to avoid
+# heap_page_prune_opt call which cause conflict on recovery hiding conflict
+# caused due index hint bits
+$node_primary->safe_psql('postgres',
+    'INSERT INTO test_index_hint VALUES (generate_series(1, 30), 0)');
+# And vacuum to allow index hint bits to be set
+$node_primary->safe_psql('postgres', 'VACUUM test_index_hint');
+# For fail-fast in case FPW from primary
+$node_primary->safe_psql('postgres', 'CHECKPOINT');
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Restore standby node from backup backup
+my $node_standby_1 = get_new_node('standby_1');
+$node_standby_1->init_from_backup($node_primary, $backup_name,
+    has_streaming => 1);
+
+my $standby_settings = qq{
+    max_standby_streaming_delay = 1
+    wal_receiver_status_interval = 1
+    hot_standby_feedback = on
+    enable_seqscan = off
+    enable_indexonlyscan = off
+};
+$node_standby_1->append_conf('postgresql.conf', $standby_settings);
+$node_standby_1->start;
+
+$node_standby_1->backup($backup_name);
+
+# Create second standby node linking to standby 1
+my $node_standby_2 = get_new_node('standby_2');
+$node_standby_2->init_from_backup($node_standby_1, $backup_name,
+    has_streaming => 1);
+$node_standby_2->append_conf('postgresql.conf', $standby_settings);
+$node_standby_2->start;
+
+# Make sure sender_propagates_feedback_to_primary is set on standbys
+wait_hfs($node_primary, 1);
+wait_hfs($node_standby_1, 1);
+
+# To avoid hanging while expecting some specific input from a psql
+# instance being driven by us, add a timeout high enough that it
+# should never trigger even on very slow machines, unless something
+# is really wrong.
+my $psql_timeout = IPC::Run::timer(30);
+
+# One psql to run command in repeatable read isolation level
+my %psql_standby_repeatable_read = ('stdin' => '', 'stdout' => '', 'stderr' => '');
+$psql_standby_repeatable_read{run} =
+    IPC::Run::start(
+        [ 'psql', '-XAb', '-f', '-', '-d', $node_standby_1->connstr('postgres') ],
+        '<', \$psql_standby_repeatable_read{stdin},
+        '>', \$psql_standby_repeatable_read{stdout},
+        '2>', \$psql_standby_repeatable_read{stderr},
+        $psql_timeout);
+
+# Another psql to run command in read committed isolation level
+my %psql_standby_read_committed = ('stdin' => '', 'stdout' => '', 'stderr' => '');
+$psql_standby_read_committed{run} =
+    IPC::Run::start(
+        [ 'psql', '-XAb', '-f', '-', '-d', $node_standby_1->connstr('postgres') ],
+        '<', \$psql_standby_read_committed{stdin},
+        '>', \$psql_standby_read_committed{stdout},
+        '2>', \$psql_standby_read_committed{stderr},
+        $psql_timeout);
+
+# Start RR transaction and read first row from index
+ok(send_query_and_wait(\%psql_standby_repeatable_read,
+    q[
+BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;
+],
+    qr/1\n\(1 row\)/m),
+    'row is visible in repeatable read');
+
+# Start RC transaction and read first row from index
+ok(send_query_and_wait(\%psql_standby_read_committed,
+    q[
+BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
+SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;
+],
+    qr/1\n\(1 row\)/m),
+    'row is visible in read committed');
+
+# Now delete first 10 rows in index
+$node_primary->safe_psql('postgres',
+    'UPDATE test_index_hint SET value = 1 WHERE id <= 10');
+
+# Make sure hint bits are not set on primary
+is(hints_num($node_primary), qq(0), 'no index hint bits are set on primary yet');
+
+# Make sure page is not processed by heap_page_prune_opt
+is(non_normal_num($node_primary), qq(0), 'all items are normal in heap');
+
+# Wait for standbys to catch up transaction
+wait_for_catchup_all();
+
+# Disable hot_standby_feedback to trigger conflicts later
+$node_standby_1->safe_psql('postgres',
+    'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_1->reload;
+
+# Make sure sender_propagates_feedback_to_primary is not set on standby
+wait_hfs($node_primary, 0);
+wait_hfs($node_standby_1, 1);
+
+# Try to set hint bits in index on standby
+try_to_set_hint_bits();
+
+# Make sure read committed transaction is able to see correct data
+ok(send_query_and_wait(\%psql_standby_read_committed,
+    q/SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;/,
+    qr/11\n\(1 row\)/m),
+    'session is not canceled for read committed');
+
+# Make sure previous queries not set the hints on standby because
+# of parallel transaction running
+ok(send_query_and_wait(\%psql_standby_repeatable_read,
+    q/SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;/,
+    qr/1\n\(1 row\)/m),
+    'hints on standby are not set');
+
+is(hints_num($node_standby_1), qq(0), 'no index hint bits are set on standby yet');
+
+
+# Set index hint bits and replicate to standby
+$node_primary->safe_psql('postgres',
+    'SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;');
+
+# Make sure page is not processed by heap_page_prune_opt
+is(non_normal_num($node_primary), qq(0), 'all items are normal in heap');
+# Make sure hint bits are set
+is(hints_num($node_primary), qq(10), 'hint bits are set on primary already');
+
+## Wait for standbys to catch up hint bits
+wait_for_catchup_all();
+
+# Make sure read committed transaction is able to see correct data
+ok(send_query_and_wait(\%psql_standby_read_committed,
+    q/SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;/,
+    qr/11\n\(1 row\)/m),
+    'session is not canceled for read committed');
+
+# Make sure repeatable read transaction is canceled because of XLOG_INDEX_HINT_BITS_HORIZON from primary
+ok((send_query_and_wait(\%psql_standby_repeatable_read,
+    q/SELECT id FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;/,
+    qr/.*terminating connection due to conflict with recovery.*/m)),
+    'session is canceled for repeatable read');
+
+# Try to set hint bits in index on standby
+try_to_set_hint_bits();
+
+is(hints_num($node_standby_1), qq(0),
+    'hints are not set on standby1 because hs feedback is off');
+is(hints_num($node_standby_2), qq(0),
+    'hint bits are not set on standby2 because hs feedback chain is broker');
+
+# Enable hot_standby_feedback to allow hint bits to be set
+$node_standby_1->safe_psql('postgres',
+    'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_1->reload;
+
+# Make sure sender_propagates_feedback_to_primary is now set on standbys
+wait_hfs($node_primary, 1);
+wait_hfs($node_standby_1, 1);
+
+# Try to set hint bits in index on standby
+try_to_set_hint_bits();
+
+is(hints_num($node_standby_1), qq(10),
+    'hint bits are set on standby 1 yet because feedback is on');
+is(hints_num($node_standby_2), qq(10),
+    'hint bits are set on standby 2 yet because feedback chain is uninterrupted');
+
+$node_primary->stop();
+$node_standby_1->stop();
+$node_standby_2->stop();
+
+# Send query, wait until string matches
+sub send_query_and_wait {
+    my ($psql, $query, $untl) = @_;
+
+    # send query
+    $$psql{stdin} .= $query;
+    $$psql{stdin} .= "\n";
+
+    # wait for query results
+    $$psql{run}->pump_nb();
+    while (1) {
+        # See PostgresNode.pm's psql()
+        $$psql{stdout} =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+
+        #diag("\n" . $$psql{stdout}); # for debugging
+        #diag("\n" . $$psql{stderr}); # for debugging
+
+        last if $$psql{stdout} =~ /$untl/;
+        last if $$psql{stderr} =~ /$untl/;
+
+        if ($psql_timeout->is_expired) {
+            BAIL_OUT("aborting wait: program timed out \n" .
+                "stream contents: >>$$psql{stdout}<< \n" .
+                "pattern searched for: $untl");
+            return 0;
+        }
+        if (not $$psql{run}->pumpable()) {
+            # This is fine for some tests, keep running
+            return 0;
+        }
+        $$psql{run}->pump();
+        select(undef, undef, undef, 0.01); # sleep a little
+
+    }
+
+    $$psql{stdout} = '';
+
+    return 1;
+}
+
+sub try_to_set_hint_bits {
+    # Try to set hint bits in index on standby
+    foreach (0 .. 3) {
+        $node_standby_1->safe_psql('postgres',
+            'SELECT * FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;');
+        $node_standby_2->safe_psql('postgres',
+            'SELECT * FROM test_index_hint WHERE value = 0 ORDER BY id LIMIT 1;');
+    }
+}
+
+sub wait_for_catchup_all {
+    $node_primary->wait_for_catchup($node_standby_1, 'replay',
+        $node_primary->lsn('insert'));
+    $node_standby_1->wait_for_catchup($node_standby_2, 'replay',
+        $node_standby_1->lsn('replay'));
+}
+
+sub hints_num {
+    my ($node) = @_;
+    return $node->safe_psql('postgres',
+        "SELECT count(*) FROM bt_page_items('test_index', 1) WHERE dead = true");
+}
+
+sub non_normal_num {
+    my ($node) = @_;
+    return $node->safe_psql('postgres',
+        "SELECT COUNT(*) FROM heap_page_items(get_raw_page('test_index_hint', 0)) WHERE lp_flags != 1");
+}
+
+sub wait_hfs {
+    my ($node, $n) = @_;
+    $node->poll_query_until('postgres',
+        "SELECT (SELECT COUNT(*) FROM (SELECT * FROM pg_stat_replication WHERE backend_xmin IS NOT NULL) AS X) = $n")
+            or die 'backend_xmin is invalid';
+    # Make sure we have received reply to feedback message
+    sleep(2);
+}
\ No newline at end of file
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 9496f76b1f..56039ab5a6 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1733,6 +1733,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting for recovery conflict resolution for dropping a
        tablespace.</entry>
      </row>
+      <row>
+       <entry><literal>RecoveryConflictSnapshotIndexHintBits</literal></entry>
+       <entry>Waiting for recovery conflict resolution for index hit bits.</entry>
+      </row>
      <row>
       <entry><literal>RecoveryPause</literal></entry>
       <entry>Waiting for recovery to be resumed.</entry>
@@ -1907,6 +1911,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to read or update dynamic shared memory allocation
        information.</entry>
      </row>
+     <row>
+      <entry><literal>IndexHintHorizonShmem</literal></entry>
+      <entry>Waiting to read or update information about the latest index hint
+      bits horizon.</entry>
+     </row>
      <row>
       <entry><literal>LockFastPath</literal></entry>
       <entry>Waiting to read or update a process' fast-path lock
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 92205325fb..05d29e13dc 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -473,6 +473,11 @@ LSN of the page, and only act to set LP_DEAD bits when the LSN has not
 changed at all. (Avoiding dropping the pin entirely also makes it safe, of
 course.)
 
+LP_DEAD are not WAL logged themself, but they could reach standby as part
+of the full-page write. To allow usage of LP_DEAD hints on hot standby
+without MVCC failures latestRemovedXid of entries that are marked dead is WAL
+logged (index hint bits horizon).
+
 Bottom-Up deletion
 ------------------
 
@@ -653,17 +658,30 @@ lax about how same-level locks are acquired during recovery (most kinds
 of readers could still move right to recover if we didn't couple
 same-level locks), but we prefer to be conservative here.
 
-During recovery all index scans start with ignore_killed_tuples = false
-and we never set kill_prior_tuple. We do this because the oldest xmin
-on the standby server can be older than the oldest xmin on the primary
-server, which means tuples can be marked LP_DEAD even when they are
-still visible on the standby. We don't WAL log tuple LP_DEAD bits, but
-they can still appear in the standby because of full page writes. So
-we must always ignore them in standby, and that means it's not worth
-setting them either.  (When LP_DEAD-marked tuples are eventually deleted
-on the primary, the deletion is WAL-logged.  Queries that run on a
-standby therefore get much of the benefit of any LP_DEAD setting that
-takes place on the primary.)
+There is some complexity in using LP_DEAD bits during recovery. Generally,
+bits could be set and read by scan, but there is a possibility to meet
+the bit applied on the primary. We don't WAL log tuple LP_DEAD bits, but
+they can still appear on the standby because of the full-page writes. Such
+a cause could cause MVCC failures because the oldest xmin on the standby
+server can be older than the oldest xmin on the primary server, which means
+tuples can be marked LP_DEAD even when they are still visible on the standby.
+
+To prevent such failure, we WAL log latestRemovedXid of entries that are
+marked dead. This WAL record is used to raise conflicts on standby. But in
+some workloads, it could cause a very high rate of conflicts. To eliminate
+such conflicts each scan decides to use hint bits or not at the moment of
+getting the snapshot (ignore_killed_tuples flag). If no - bits are ignored,
+but conflict resolution is also ignored. The decision is based on effective
+hot_standby_feedback status. If hot_standby_feedback is enabled and backend
+xmin is effectively honored by the primary, then ignore_killed_tuples is set
+to true. BTW, hot_standby_feedback has nothing with MVCC correctness, it is
+just taken into account to reduce unnecessary backend cancelation.
+
+Also, there is a restriction on settings LP_DEAD bits by the standby. It is not
+allowed to set bits on the page if the commit record of latestRemovedXid is less
+than minRecoveryPoint. If the latestRemovedXid is invalid (happens if tuples
+were cleared by XLOG_HEAP2_CLEAN) - we need to check the current LSN of the
+page. If it is less than minRecoveryPoint - it is safe to set hint bits.
 
 Note that we talk about scans that are started during recovery. We go to
 a little trouble to allow a scan to start during recovery and end during
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..401d87b2c0 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -633,13 +633,15 @@ In some cases, we write additional information to data blocks without
 writing a preceding WAL record. This should only happen iff the data can
 be reconstructed later following a crash and the action is simply a way
 of optimising for performance. When a hint is written we use
-MarkBufferDirtyHint() to mark the block dirty.
+MarkBufferDirtyHint() or MarkBufferDirtyindexHint() to mark the block dirty.
 
 If the buffer is clean and checksums are in use then MarkBufferDirtyHint()
 inserts an XLOG_FPI_FOR_HINT record to ensure that we take a full page image
 that includes the hint. We do this to avoid a partial page write, when we
 write the dirtied page. WAL is not written during recovery, so we simply skip
-dirtying blocks because of hints when in recovery.
+dirtying blocks because of hints when in recovery. MarkBufferDirtyIndexHint()
+also, optionally WAL logs new index hint bits horizon value allowing
+LP_DEAD hints to be read\set on standby without MVCC failures.
 
 If you do decide to optimise away a WAL record, then any calls to
 MarkBufferDirty() must be replaced by MarkBufferDirtyHint(),
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index c8f7e781c6..8e7c7fe6ae 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -17,6 +17,7 @@
 #include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
+#include "access/heapam_xlog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -67,6 +68,7 @@ gistkillitems(IndexScanDesc scan)
 	{
 		UnlockReleaseBuffer(buffer);
 		so->numKilled = 0;		/* reset counter */
+		so->killedLatestRemovedXid = InvalidTransactionId;
 		return;
 	}
 
@@ -87,7 +89,9 @@ gistkillitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		GistMarkPageHasGarbage(page);
-		MarkBufferDirtyHint(buffer, true);
+		MarkBufferDirtyIndexHint(buffer, true,
+								 scan->indexRelation,
+								 so->killedLatestRemovedXid);
 	}
 
 	UnlockReleaseBuffer(buffer);
@@ -97,6 +101,7 @@ gistkillitems(IndexScanDesc scan)
 	 * pages.
 	 */
 	so->numKilled = 0;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 }
 
 /*
@@ -666,8 +671,12 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 						MemoryContextSwitchTo(oldCxt);
 					}
 					if (so->numKilled < MaxIndexTuplesPerPage)
+					{
 						so->killedItems[so->numKilled++] =
 							so->pageData[so->curPageData - 1].offnum;
+						IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+															&so->killedLatestRemovedXid);
+					}
 				}
 				/* continuing to return tuples from a leaf page */
 				scan->xs_heaptid = so->pageData[so->curPageData].heapPtr;
@@ -703,8 +712,12 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 					MemoryContextSwitchTo(oldCxt);
 				}
 				if (so->numKilled < MaxIndexTuplesPerPage)
+				{
 					so->killedItems[so->numKilled++] =
 						so->pageData[so->curPageData - 1].offnum;
+					IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+														&so->killedLatestRemovedXid);
+				}
 			}
 			/* find and process the next index page */
 			do
diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c
index 61e92cf0f5..b959ac5f17 100644
--- a/src/backend/access/gist/gistscan.c
+++ b/src/backend/access/gist/gistscan.c
@@ -107,6 +107,7 @@ gistbeginscan(Relation r, int nkeys, int norderbys)
 	}
 
 	so->killedItems = NULL;		/* until needed */
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 	so->curBlkno = InvalidBlockNumber;
 	so->curPageLSN = InvalidXLogRecPtr;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 0752fb38a9..2d3ae80ca7 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -20,6 +20,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "access/heapam_xlog.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "catalog/index.h"
@@ -311,7 +312,11 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 					palloc(MaxIndexTuplesPerPage * sizeof(int));
 
 			if (so->numKilled < MaxIndexTuplesPerPage)
+			{
 				so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+				IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+													&so->killedLatestRemovedXid);
+			}
 		}
 
 		/*
@@ -379,6 +384,7 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
 	so->hashso_buc_split = false;
 
 	so->killedItems = NULL;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 
 	scan->opaque = so;
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index 519872850e..7f9f13f115 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -545,6 +545,7 @@ _hash_kill_items(IndexScanDesc scan)
 	OffsetNumber offnum,
 				maxoff;
 	int			numKilled = so->numKilled;
+	TransactionId killedLatestRemovedXid = so->killedLatestRemovedXid;
 	int			i;
 	bool		killedsomething = false;
 	bool		havePin = false;
@@ -558,6 +559,7 @@ _hash_kill_items(IndexScanDesc scan)
 	 * pages.
 	 */
 	so->numKilled = 0;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 
 	blkno = so->currPos.currPage;
 	if (HashScanPosIsPinned(so->currPos))
@@ -611,7 +613,9 @@ _hash_kill_items(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
-		MarkBufferDirtyHint(buf, true);
+		MarkBufferDirtyIndexHint(buf, true,
+								 scan->indexRelation,
+								 killedLatestRemovedXid);
 	}
 
 	if (so->hashso_bucket_buf == so->currPos.buf ||
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9926e2bd54..c4f55b8268 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1527,9 +1527,11 @@ heap_fetch(Relation relation,
  * the tuple here, in addition to updating *tid.  If no match is found, the
  * contents of this buffer on return are undefined.
  *
- * If all_dead is not NULL, we check non-visible tuples to see if they are
- * globally dead; *all_dead is set true if all members of the HOT chain
- * are vacuumable, false if not.
+ * If indexHintBitsData is not NULL, we check non-visible tuples to see if they
+ * are globally dead; *all_dead is set true if all members of the HOT chain
+ * are vacuumable, false if not. Also, *latest_removed_xid is set to the
+ * latest removed xid in a HOT chain, if known. *page_lsn is set to current page
+ * LSN value.
  *
  * Unlike heap_fetch, the caller must already have pin and (at least) share
  * lock on the buffer; it is still pinned/locked at exit.  Also unlike
@@ -1538,7 +1540,7 @@ heap_fetch(Relation relation,
 bool
 heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 					   Snapshot snapshot, HeapTuple heapTuple,
-					   bool *all_dead, bool first_call)
+					   IndexHintBitsData *indexHintBitsData, bool first_call)
 {
 	Page		dp = (Page) BufferGetPage(buffer);
 	TransactionId prev_xmax = InvalidTransactionId;
@@ -1550,8 +1552,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 	GlobalVisState *vistest = NULL;
 
 	/* If this is not the first call, previous call returned a (live!) tuple */
-	if (all_dead)
-		*all_dead = first_call;
+	if (indexHintBitsData)
+	{
+		indexHintBitsData->all_dead = first_call;
+		indexHintBitsData->latest_removed_xid = InvalidTransactionId;
+		indexHintBitsData->page_lsn = PageGetLSN(dp);
+	}
 
 	blkno = ItemPointerGetBlockNumber(tid);
 	offnum = ItemPointerGetOffsetNumber(tid);
@@ -1584,6 +1590,13 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 				at_chain_start = false;
 				continue;
 			}
+			/*
+			 * Even if all items are dead we are not sure about latest_removed_xid
+			 * value. In theory, some newer items of the chain could be vacuumed
+			 * while older are not (pure paranoia, probably).
+			 */
+			if (indexHintBitsData)
+				indexHintBitsData->latest_removed_xid = InvalidTransactionId;
 			/* else must be end of chain */
 			break;
 		}
@@ -1633,8 +1646,11 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 				ItemPointerSetOffsetNumber(tid, offnum);
 				PredicateLockTID(relation, &heapTuple->t_self, snapshot,
 								 HeapTupleHeaderGetXmin(heapTuple->t_data));
-				if (all_dead)
-					*all_dead = false;
+				if (indexHintBitsData)
+				{
+					indexHintBitsData->all_dead = false;
+					indexHintBitsData->latest_removed_xid = InvalidTransactionId;
+				}
 				return true;
 			}
 		}
@@ -1648,13 +1664,19 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 		 * Note: if you change the criterion here for what is "dead", fix the
 		 * planner's get_actual_variable_range() function to match.
 		 */
-		if (all_dead && *all_dead)
+		if (indexHintBitsData && indexHintBitsData->all_dead)
 		{
 			if (!vistest)
 				vistest = GlobalVisTestFor(relation);
 
 			if (!HeapTupleIsSurelyDead(heapTuple, vistest))
-				*all_dead = false;
+			{
+				indexHintBitsData->all_dead = false;
+				indexHintBitsData->latest_removed_xid = InvalidTransactionId;
+			}
+			else
+				HeapTupleHeaderAdvanceLatestRemovedXid(heapTuple->t_data,
+											&indexHintBitsData->latest_removed_xid);
 		}
 
 		/*
@@ -7085,6 +7107,20 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 	/* *latestRemovedXid may still be invalid at end */
 }
 
+void
+IndexHintBitAdvanceLatestRemovedXid(TransactionId killedTupleRemovedXid,
+									TransactionId *latestRemovedXid)
+{
+	if (TransactionIdIsNormal(killedTupleRemovedXid))
+	{
+		if (!TransactionIdIsValid(*latestRemovedXid))
+			*latestRemovedXid = killedTupleRemovedXid;
+		else
+			*latestRemovedXid =
+			TransactionIdLatest(killedTupleRemovedXid, 1, latestRemovedXid);
+	}
+}
+
 #ifdef USE_PREFETCH
 /*
  * Helper function for heap_index_delete_tuples.  Issues prefetch requests for
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 4a70e20a14..6362a71d35 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -113,7 +113,8 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 						 ItemPointer tid,
 						 Snapshot snapshot,
 						 TupleTableSlot *slot,
-						 bool *call_again, bool *all_dead)
+						 bool *call_again,
+						 IndexHintBitsData *indexHintBitsData)
 {
 	IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
@@ -145,7 +146,7 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 											hscan->xs_cbuf,
 											snapshot,
 											&bslot->base.tupdata,
-											all_dead,
+											indexHintBitsData,
 											!*call_again);
 	bslot->base.tupdata.t_self = *tid;
 	LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 1c3e937c61..359a39e1d8 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -20,7 +20,6 @@
 #include "postgres.h"
 
 #include "access/genam.h"
-#include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -28,6 +27,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -106,18 +106,16 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
 	scan->xs_want_itup = false; /* may be set later */
 
 	/*
-	 * During recovery we ignore killed tuples and don't bother to kill them
-	 * either. We do this because the xmin on the primary node could easily be
-	 * later than the xmin on the standby node, so that what the primary
-	 * thinks is killed is supposed to be visible on standby. So for correct
-	 * MVCC for queries during recovery we must ignore these hints and check
-	 * all tuples. Do *not* set ignore_killed_tuples to true when running in a
-	 * transaction that was started during recovery. xactStartedInRecovery
-	 * should not be altered by index AMs.
-	 */
+	 * For correct MVCC for queries during recovery, we could use
+	 * index hint bits as on the primary. But to avoid frequent query
+	 * cancellation we do it only if hot_standby_feedback is active and
+	 * our xmin is honored on the primary.
+	 *
+	 * The decision is made in GetSnapshotIndexIgnoreKilledTuples.
+	*/
 	scan->kill_prior_tuple = false;
-	scan->xactStartedInRecovery = TransactionStartedDuringRecovery();
-	scan->ignore_killed_tuples = !scan->xactStartedInRecovery;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
+	scan->ignore_killed_tuples = MyProc->indexIgnoreKilledTuples;
 
 	scan->opaque = NULL;
 
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 3d2dbed708..dee80b8ef7 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -309,6 +309,7 @@ index_rescan(IndexScanDesc scan,
 		table_index_fetch_reset(scan->xs_heapfetch);
 
 	scan->kill_prior_tuple = false; /* for safety */
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
@@ -386,6 +387,7 @@ index_restrpos(IndexScanDesc scan)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
 	scan->kill_prior_tuple = false; /* for safety */
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	scan->indexRelation->rd_indam->amrestrpos(scan);
@@ -534,6 +536,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 
 	/* Reset kill flag immediately for safety */
 	scan->kill_prior_tuple = false;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	/* If we're out of index entries, we're done */
@@ -574,12 +577,17 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 bool
 index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 {
-	bool		all_dead = false;
-	bool		found;
+	IndexHintBitsData	ihbd;
+	bool				found;
+
+	ihbd.all_dead = false;
+	ihbd.latest_removed_xid = InvalidTransactionId;
+	ihbd.page_lsn = InvalidXLogRecPtr;
 
 	found = table_index_fetch_tuple(scan->xs_heapfetch, &scan->xs_heaptid,
 									scan->xs_snapshot, slot,
-									&scan->xs_heap_continue, &all_dead);
+									&scan->xs_heap_continue,
+									&ihbd);
 
 	if (found)
 		pgstat_count_heap_fetch(scan->indexRelation);
@@ -587,13 +595,15 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 	/*
 	 * If we scanned a whole HOT chain and found only dead tuples, tell index
 	 * AM to kill its entry for that TID (this will take effect in the next
-	 * amgettuple call, in index_getnext_tid).  We do not do this when in
-	 * recovery because it may violate MVCC to do so.  See comments in
-	 * RelationGetIndexScan().
+	 * amgettuple call, in index_getnext_tid). We do this when in
+	 * recovery only in certain conditions because it may violate MVCC.
 	 */
-	if (!scan->xactStartedInRecovery)
-		scan->kill_prior_tuple = all_dead;
-
+	if (scan->ignore_killed_tuples)
+	{
+		scan->kill_prior_tuple = IsMarkBufferDirtyIndexHintAllowed(&ihbd);
+		scan->prior_tuple_removed_xid = scan->kill_prior_tuple ?
+								ihbd.latest_removed_xid : InvalidTransactionId;
+	}
 	return found;
 }
 
@@ -667,6 +677,7 @@ index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap)
 
 	/* just make sure this is false... */
 	scan->kill_prior_tuple = false;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 
 	/*
 	 * have the am's getbitmap proc do all the work.
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e333603912..954cbe5562 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -502,7 +502,11 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 			if (inposting || !ItemIdIsDead(curitemid))
 			{
 				ItemPointerData htid;
-				bool		all_dead = false;
+				IndexHintBitsData ihbd;
+
+				ihbd.all_dead = false;
+				ihbd.latest_removed_xid = InvalidTransactionId;
+				ihbd.page_lsn = InvalidXLogRecPtr;
 
 				if (!inposting)
 				{
@@ -556,7 +560,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				 */
 				else if (table_index_fetch_tuple_check(heapRel, &htid,
 													   &SnapshotDirty,
-													   &all_dead))
+													   &ihbd))
 				{
 					TransactionId xwait;
 
@@ -670,7 +674,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 													RelationGetRelationName(rel))));
 					}
 				}
-				else if (all_dead && (!inposting ||
+				else if (ihbd.all_dead && (!inposting ||
 									  (prevalldead &&
 									   curposti == BTreeTupleGetNPosting(curitup) - 1)))
 				{
@@ -687,16 +691,18 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 					 * crucial. Be sure to mark the proper buffer dirty.
 					 */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						MarkBufferDirtyIndexHint(nbuf, true,
+												 rel, ihbd.latest_removed_xid);
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						MarkBufferDirtyIndexHint(insertstate->buf, true,
+												 rel, ihbd.latest_removed_xid);
 				}
 
 				/*
 				 * Remember if posting list tuple has even a single HOT chain
 				 * whose members are not all dead
 				 */
-				if (!all_dead && inposting)
+				if (!ihbd.all_dead && inposting)
 					prevalldead = false;
 			}
 		}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 289bd3c15d..c35a34003b 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -21,7 +21,7 @@
 #include "access/nbtree.h"
 #include "access/nbtxlog.h"
 #include "access/relscan.h"
-#include "access/xlog.h"
+#include "access/heapam_xlog.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
@@ -272,7 +272,11 @@ btgettuple(IndexScanDesc scan, ScanDirection dir)
 					so->killedItems = (int *)
 						palloc(MaxTIDsPerBTreePage * sizeof(int));
 				if (so->numKilled < MaxTIDsPerBTreePage)
+				{
 					so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+					IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+														&so->killedLatestRemovedXid);
+				}
 			}
 
 			/*
@@ -378,6 +382,7 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
 	so->arrayContext = NULL;
 
 	so->killedItems = NULL;		/* until needed */
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 
 	/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index d524310723..bfa1c357c3 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1724,6 +1724,7 @@ _bt_killitems(IndexScanDesc scan)
 	OffsetNumber maxoff;
 	int			i;
 	int			numKilled = so->numKilled;
+	TransactionId killedLatestRemovedXid = so->killedLatestRemovedXid;
 	bool		killedsomething = false;
 	bool		droppedpin PG_USED_FOR_ASSERTS_ONLY;
 
@@ -1734,6 +1735,7 @@ _bt_killitems(IndexScanDesc scan)
 	 * pages.
 	 */
 	so->numKilled = 0;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 
 	if (BTScanPosIsPinned(so->currPos))
 	{
@@ -1883,7 +1885,9 @@ _bt_killitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
-		MarkBufferDirtyHint(so->currPos.buf, true);
+		MarkBufferDirtyIndexHint(so->currPos.buf, true,
+								 scan->indexRelation,
+								 killedLatestRemovedXid);
 	}
 
 	_bt_unlockbuf(scan->indexRelation, so->currPos.buf);
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 01ee7ac6d2..32ebd730f1 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -36,6 +36,16 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 		appendStringInfoString(buf, "; subxid ovf");
 }
 
+static void
+standby_desc_index_hint_bits_horizon(StringInfo buf,
+									 xl_index_hint_bits_horizon *xlrec)
+{
+	char		*path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+	appendStringInfo(buf, "latestRemovedXid %u in %s",
+					 xlrec->latestRemovedXid, path);
+}
+
 void
 standby_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -66,6 +76,12 @@ standby_desc(StringInfo buf, XLogReaderState *record)
 								   xlrec->dbId, xlrec->tsId,
 								   xlrec->relcacheInitFileInval);
 	}
+	else if (info == XLOG_INDEX_HINT_BITS_HORIZON)
+	{
+		xl_index_hint_bits_horizon *xlrec = (xl_index_hint_bits_horizon *) rec;
+
+		standby_desc_index_hint_bits_horizon(buf, xlrec);
+	}
 }
 
 const char *
@@ -84,6 +100,9 @@ standby_identify(uint8 info)
 		case XLOG_INVALIDATIONS:
 			id = "INVALIDATIONS";
 			break;
+		case XLOG_INDEX_HINT_BITS_HORIZON:
+			id = "INDEX_HINT_BITS_HORIZON";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 5ea5bdd810..bb9a0ddc18 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -219,7 +219,7 @@ bool
 table_index_fetch_tuple_check(Relation rel,
 							  ItemPointer tid,
 							  Snapshot snapshot,
-							  bool *all_dead)
+							  IndexHintBitsData *indexHintBitsData)
 {
 	IndexFetchTableData *scan;
 	TupleTableSlot *slot;
@@ -229,7 +229,7 @@ table_index_fetch_tuple_check(Relation rel,
 	slot = table_slot_create(rel, NULL);
 	scan = table_index_fetch_begin(rel);
 	found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again,
-									all_dead);
+									indexHintBitsData);
 	table_index_fetch_end(scan);
 	ExecDropSingleTupleTableSlot(slot);
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f75b52719d..f4c1f830d6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4104,6 +4104,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT:
 			event_name = "RecoveryConflictSnapshot";
 			break;
+		case WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS:
+			event_name = "RecoveryConflictSnapshotIndexHintBits";
+			break;
 		case WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE:
 			event_name = "RecoveryConflictTablespace";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df00d0..8289f055b0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -411,6 +411,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
 			 */
 			break;
+		case XLOG_INDEX_HINT_BITS_HORIZON:
+			break;
 		default:
 			elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e903e561af..401a7d5693 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -577,6 +577,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 #endif
 
 	MyProc->xmin = snap->xmin;
+	// to keep it simple use index hint bits on the primary only
+	MyProc->indexIgnoreKilledTuples = !RecoveryInProgress();
 
 	/* allocate in transaction context */
 	newxip = (TransactionId *)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 723f513d8b..1199dffc45 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -225,6 +225,9 @@ WalReceiverMain(void)
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
 	walrcv->walRcvState = WALRCV_STREAMING;
+	/* Initially true so we always send at least one feedback message */
+	walrcv->sender_has_standby_xmin = true;
+	walrcv->sender_propagates_feedback_to_primary = false;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -806,6 +809,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 	XLogRecPtr	walEnd;
 	TimestampTz sendTime;
 	bool		replyRequested;
+	bool		senderPropagatesFeedbackToPrimary;
 
 	resetStringInfo(&incoming_message);
 
@@ -835,7 +839,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 		case 'k':				/* Keepalive */
 			{
 				/* copy message to StringInfo */
-				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(char);
 				if (len != hdrlen)
 					ereport(ERROR,
 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -846,8 +850,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				walEnd = pq_getmsgint64(&incoming_message);
 				sendTime = pq_getmsgint64(&incoming_message);
 				replyRequested = pq_getmsgbyte(&incoming_message);
+				senderPropagatesFeedbackToPrimary = pq_getmsgbyte(&incoming_message);
 
 				ProcessWalSndrMessage(walEnd, sendTime);
+				WalRcv->sender_propagates_feedback_to_primary = senderPropagatesFeedbackToPrimary;
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
@@ -1110,15 +1116,13 @@ XLogWalRcvSendHSFeedback(bool immed)
 				catalog_xmin;
 	static TimestampTz sendTime = 0;
 
-	/* initially true so we always send at least one feedback message */
-	static bool primary_has_standby_xmin = true;
 
 	/*
 	 * If the user doesn't want status to be reported to the primary, be sure
 	 * to exit before doing anything at all.
 	 */
 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
-		!primary_has_standby_xmin)
+		!WalRcv->sender_has_standby_xmin)
 		return;
 
 	/* Get current timestamp. */
@@ -1188,9 +1192,9 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint32(&reply_message, catalog_xmin_epoch);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
-		primary_has_standby_xmin = true;
+		WalRcv->sender_has_standby_xmin = true;
 	else
-		primary_has_standby_xmin = false;
+		WalRcv->sender_has_standby_xmin = false;
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8545c6c423..71fe08e1ab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2169,6 +2169,12 @@ ProcessStandbyHSFeedbackMessage(void)
 		else
 			MyProc->xmin = feedbackXmin;
 	}
+
+	/*
+	 * Always send keep-alive after feedback to allow standby to maintain
+	 * WalRcv->sender_propagates_feedback_to_primary.
+	 */
+	WalSndKeepalive(false);
 }
 
 /*
@@ -3450,7 +3456,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply)
 {
+	bool am_propagating_feedback_to_primary;
 	elog(DEBUG2, "sending replication keepalive");
+	am_propagating_feedback_to_primary = !am_cascading_walsender
+		|| (WalRcv->sender_has_standby_xmin && WalRcv->sender_propagates_feedback_to_primary);
 
 	/* construct the message... */
 	resetStringInfo(&output_message);
@@ -3458,6 +3467,7 @@ WalSndKeepalive(bool requestReply)
 	pq_sendint64(&output_message, sentPtr);
 	pq_sendint64(&output_message, GetCurrentTimestamp());
 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
+	pq_sendbyte(&output_message, am_propagating_feedback_to_primary ? 1 : 0);
 
 	/* ... and send it wrapped in CopyData */
 	pq_putmessage_noblock('d', output_message.data, output_message.len);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 561c212092..63e60613c4 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3898,6 +3898,56 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	}
 }
 
+/*
+ * MarkBufferDirtyIndexHint
+ *
+ * This is essentially the same as MarkBufferDirtyHint, except it WAL log
+ * new value for index hint bits horizon if required.
+ *
+ * Should be used instead of MarkBufferDirtyHint for LP_DEAD hints in indexes.
+ */
+void
+MarkBufferDirtyIndexHint(Buffer buffer, bool buffer_std,
+						 Relation rel, TransactionId latestRemovedXid)
+{
+	LogIndexHintBitsHorizonIfNeeded(rel, latestRemovedXid);
+	MarkBufferDirtyHint(buffer, buffer_std);
+}
+
+/*
+ * IsMarkBufferDirtyIndexHintAllowed
+ *
+ * Checks is it allowed to set index hint bit for the tuple.
+ */
+bool
+IsMarkBufferDirtyIndexHintAllowed(IndexHintBitsData *indexHintBitsData)
+{
+	if (!indexHintBitsData->all_dead)
+		return false;
+	// it all always allowed on primary if *all_dead
+	if (!RecoveryInProgress())
+		return true;
+
+	if (TransactionIdIsValid(indexHintBitsData->latest_removed_xid)) {
+		/*
+		 * If latest_removed_xid is known - make sure its commit record
+		 * less than minRecoveryPoint to avoid MVCC failure after crash recovery.
+		 */
+		XLogRecPtr commitLSN
+				= TransactionIdGetCommitLSN(indexHintBitsData->latest_removed_xid);
+
+		return !XLogNeedsFlush(commitLSN);
+	} else {
+		/*
+		 * Looks like it is tuple cleared by heap_page_prune_execute,
+		 * so conflict resolution already done. But we must be sure if
+		 * LSN of XLOG_HEAP2_CLEAN (or any subsequent updates) less than
+		 * minRecoveryPoint to avoid MVCC failure after crash recovery.
+		 */
+		return !XLogNeedsFlush(indexHintBitsData->page_lsn);
+	}
+}
+
 /*
  * Release buffer content locks for shared buffers.
  *
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index f9bbe97b50..ce1b8f628a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -267,6 +267,7 @@ CreateSharedMemoryAndSemaphores(void)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	StandByShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cf12eda504..07863dbb2d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -65,8 +65,10 @@
 #include "utils/builtins.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
+#include "replication/walreceiver.h"
 
 #define UINT32_ACCESS_ONCE(var)		 ((uint32)(*((volatile uint32 *)&(var))))
+#define BOOL_ACCESS_ONCE(var)		 ((bool)(*((volatile bool *)&(var))))
 
 /* Our shared memory area */
 typedef struct ProcArrayStruct
@@ -655,6 +657,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
 		proc->lxid = InvalidLocalTransactionId;
 		proc->xmin = InvalidTransactionId;
+		proc->indexIgnoreKilledTuples = false;
 		proc->delayChkpt = false;	/* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
 
@@ -694,6 +697,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
 	proc->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
+	proc->indexIgnoreKilledTuples = false;
 	proc->delayChkpt = false;	/* be sure this is cleared in abort */
 	proc->recoveryConflictPending = false;
 
@@ -877,6 +881,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
+	proc->indexIgnoreKilledTuples = false;
 	proc->recoveryConflictPending = false;
 
 	Assert(!(proc->statusFlags & PROC_VACUUM_STATE_MASK));
@@ -2013,6 +2018,23 @@ GetSnapshotDataInitOldSnapshot(Snapshot snapshot)
 	}
 }
 
+static bool
+GetSnapshotIndexIgnoreKilledTuples(Snapshot snapshot)
+{
+	/*
+	 * Always use and set LP_DEAD bits on primary. In case of standby
+	 * only if hot_standby_feedback enabled, walsender has our xmin
+	 * and walsender propagates feedback up to the primary (to avoid
+	 * unnecessary cancellations).
+	 *
+	 * It is always safe to set it to true but could cause high
+	 * rate of conflicts.
+	*/
+	Assert(!RecoveryInProgress() || WalRcv);
+	return !snapshot->takenDuringRecovery ||
+		(WalRcv->sender_propagates_feedback_to_primary && WalRcv->sender_has_standby_xmin);
+}
+
 /*
  * Helper function for GetSnapshotData() that checks if the bulk of the
  * visibility information in the snapshot is still valid. If so, it updates
@@ -2057,7 +2079,10 @@ GetSnapshotDataReuse(Snapshot snapshot)
 	 * xmin.
 	 */
 	if (!TransactionIdIsValid(MyProc->xmin))
+	{
 		MyProc->xmin = TransactionXmin = snapshot->xmin;
+		MyProc->indexIgnoreKilledTuples = GetSnapshotIndexIgnoreKilledTuples(snapshot);
+	}
 
 	RecentXmin = snapshot->xmin;
 	Assert(TransactionIdPrecedesOrEquals(TransactionXmin, RecentXmin));
@@ -2345,7 +2370,10 @@ GetSnapshotData(Snapshot snapshot)
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	if (!TransactionIdIsValid(MyProc->xmin))
+	{
 		MyProc->xmin = TransactionXmin = xmin;
+		MyProc->indexIgnoreKilledTuples = GetSnapshotIndexIgnoreKilledTuples(snapshot);
+	}
 
 	LWLockRelease(ProcArrayLock);
 
@@ -2524,6 +2552,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
 		 * we don't check that.)
 		 */
 		MyProc->xmin = TransactionXmin = xmin;
+		// no need to change indexIgnoreKilledTuples because restriction is relaxed.
 
 		result = true;
 		break;
@@ -2567,6 +2596,8 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
 		TransactionIdPrecedesOrEquals(xid, xmin))
 	{
 		MyProc->xmin = TransactionXmin = xmin;
+		// we could also copy indexIgnoreKilledTuples, could be useful for parallel scans
+		MyProc->indexIgnoreKilledTuples = proc->indexIgnoreKilledTuples;
 		result = true;
 	}
 
@@ -3245,11 +3276,15 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
  *
  * If dbOid is valid we skip backends attached to other databases.
  *
+ * If onlyIndexIgnoreKilledTuples is true we include only backends
+ * with indexIgnoreKilledTuples set.
+ *
  * Be careful to *not* pfree the result from this function. We reuse
  * this array sufficiently often that we use malloc for the result.
  */
 VirtualTransactionId *
-GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+						  bool onlyIndexIgnoreKilledTuples)
 {
 	static VirtualTransactionId *vxids;
 	ProcArrayStruct *arrayP = procArray;
@@ -3287,6 +3322,8 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
 		{
 			/* Fetch xmin just once - can't change on us, but good coding */
 			TransactionId pxmin = UINT32_ACCESS_ONCE(proc->xmin);
+			bool indexIgnoreKilledTuples =
+				BOOL_ACCESS_ONCE(proc->indexIgnoreKilledTuples);
 
 			/*
 			 * We ignore an invalid pxmin because this means that backend has
@@ -3297,7 +3334,8 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
 			 * test here.
 			 */
 			if (!TransactionIdIsValid(limitXmin) ||
-				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
+				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin) &&
+					(!onlyIndexIgnoreKilledTuples || indexIgnoreKilledTuples)))
 			{
 				VirtualTransactionId vxid;
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 39a30c00f7..3cffd64161 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -42,6 +42,7 @@ int			max_standby_streaming_delay = 30 * 1000;
 bool		log_recovery_conflict_waits = false;
 
 static HTAB *RecoveryLockLists;
+static HTAB *IndexHintBitsHorizons;
 
 /* Flags set by timeout handlers */
 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
@@ -65,6 +66,12 @@ typedef struct RecoveryLockListsEntry
 	List	   *locks;
 } RecoveryLockListsEntry;
 
+typedef struct IndexHintBitsHorizonsEntry
+{
+	Oid				dbOid;
+	TransactionId	hintHorizonXid;
+} IndexHintBitsHorizonsEntry;
+
 /*
  * InitRecoveryTransactionEnvironment
  *		Initialize tracking of our primary's in-progress transactions.
@@ -425,7 +432,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
@@ -444,7 +452,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 		return;
 
 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
-										 node.dbNode);
+										 node.dbNode, false);
 
 	ResolveRecoveryConflictWithVirtualXIDs(backends,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
@@ -452,6 +460,22 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 										   true);
 }
 
+void
+ResolveIndexHintBitsRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+												 RelFileNode node)
+{
+	VirtualTransactionId *backends;
+
+	backends = GetConflictingVirtualXIDs(latestRemovedXid,
+										 node.dbNode, true);
+
+	ResolveRecoveryConflictWithVirtualXIDs(
+			backends,
+			PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+			WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS,
+			true);
+}
+
 void
 ResolveRecoveryConflictWithTablespace(Oid tsid)
 {
@@ -475,7 +499,7 @@ ResolveRecoveryConflictWithTablespace(Oid tsid)
 	 * We don't wait for commit because drop tablespace is non-transactional.
 	 */
 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
-												InvalidOid);
+												InvalidOid, false);
 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 										   WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
@@ -1026,6 +1050,43 @@ StandbyReleaseOldLocks(TransactionId oldxid)
 	}
 }
 
+static bool
+IsNewerIndexHintBitsHorizonXid(Oid dbOid, TransactionId latestRemovedXid)
+{
+	bool found, result;
+	IndexHintBitsHorizonsEntry* entry;
+	Assert(TransactionIdIsNormal(latestRemovedXid));
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_SHARED);
+	entry = (IndexHintBitsHorizonsEntry *) hash_search(IndexHintBitsHorizons, &dbOid,
+													   HASH_FIND, &found);
+
+	result = !found || TransactionIdPrecedes(entry->hintHorizonXid, latestRemovedXid);
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+
+	return result;
+}
+
+static void
+UpsertLatestIndexHintBitsHorizonXid(Oid dbOid, TransactionId latestRemovedXid)
+{
+
+	bool found;
+	IndexHintBitsHorizonsEntry* entry;
+	Assert(TransactionIdIsNormal(latestRemovedXid));
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_EXCLUSIVE);
+
+	entry = (IndexHintBitsHorizonsEntry *) hash_search(IndexHintBitsHorizons, &dbOid,
+													   HASH_ENTER, &found);
+
+	if (!found || TransactionIdPrecedes(entry->hintHorizonXid, latestRemovedXid))
+		entry->hintHorizonXid = latestRemovedXid;
+
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+}
+
+
 /*
  * --------------------------------------------------------------------
  *		Recovery handling for Rmgr RM_STANDBY_ID
@@ -1081,6 +1142,16 @@ standby_redo(XLogReaderState *record)
 											 xlrec->dbId,
 											 xlrec->tsId);
 	}
+	else if (info == XLOG_INDEX_HINT_BITS_HORIZON) {
+		if (InHotStandby) {
+			xl_index_hint_bits_horizon *xlrec =
+					(xl_index_hint_bits_horizon *) XLogRecGetData(record);
+
+			ResolveIndexHintBitsRecoveryConflictWithSnapshot(
+												xlrec->latestRemovedXid,
+												xlrec->rnode);
+		}
+	}
 	else
 		elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1381,3 +1452,49 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 
 	return reasonDesc;
 }
+
+static void
+LogIndexHintBitsHorizon(RelFileNode rnode, TransactionId latestRemovedXid)
+{
+	xl_index_hint_bits_horizon xlrec;
+
+	xlrec.rnode = rnode;
+	xlrec.latestRemovedXid = latestRemovedXid;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_index_hint_bits_horizon));
+
+	XLogInsert(RM_STANDBY_ID, XLOG_INDEX_HINT_BITS_HORIZON);
+}
+
+void
+LogIndexHintBitsHorizonIfNeeded(Relation rel, TransactionId latestRemovedXid)
+{
+	if (!RecoveryInProgress() && XLogStandbyInfoActive() &&
+			TransactionIdIsNormal(latestRemovedXid) && RelationNeedsWAL(rel)) {
+		if (IsNewerIndexHintBitsHorizonXid(rel->rd_node.dbNode, latestRemovedXid))
+		{
+			LogIndexHintBitsHorizon(rel->rd_node, latestRemovedXid);
+			UpsertLatestIndexHintBitsHorizonXid(rel->rd_node.dbNode,
+												latestRemovedXid);
+		}
+	}
+}
+
+void
+StandByShmemInit(void)
+{
+	HASHCTL		info;
+
+	MemSet(&info, 0, sizeof(info));
+	info.keysize = sizeof(Oid);
+	info.entrysize = sizeof(IndexHintBitsHorizonsEntry);
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_EXCLUSIVE);
+
+	IndexHintBitsHorizons = ShmemInitHash("IndexHintBitsHorizons",
+										  64, 64,
+										  &info, HASH_ELEM | HASH_BLOBS);
+
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+}
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 6c7cf6c295..dcaecee8f1 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -53,3 +53,4 @@ XactTruncationLock					44
 # 45 was XactTruncationLock until removal of BackendRandomLock
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
+IndexHintBitsHorizonShmemLock		48
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index c87ffc6549..2da7eb69da 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -386,6 +386,7 @@ InitProcess(void)
 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->indexIgnoreKilledTuples = false;
 	MyProc->pid = MyProcPid;
 	/* backendId, databaseId and roleId will be filled in later */
 	MyProc->backendId = InvalidBackendId;
@@ -569,6 +570,7 @@ InitAuxiliaryProcess(void)
 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->indexIgnoreKilledTuples = false;
 	MyProc->backendId = InvalidBackendId;
 	MyProc->databaseId = InvalidOid;
 	MyProc->roleId = InvalidOid;
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index ae16c3ed7d..bed98d6436 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -528,6 +528,10 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
 	 * the state for GlobalVis*.
 	 */
 	CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+	/* To keep it simple, use index hint bits only on the primary for imported
+	 * snapshots.
+	 */
+	MyProc->indexIgnoreKilledTuples = !RecoveryInProgress();
 
 	/*
 	 * Now copy appropriate fields from the source snapshot.
@@ -932,6 +936,7 @@ SnapshotResetXmin(void)
 	if (pairingheap_is_empty(&RegisteredSnapshots))
 	{
 		MyProc->xmin = InvalidTransactionId;
+		MyProc->indexIgnoreKilledTuples = false;
 		return;
 	}
 
@@ -939,6 +944,7 @@ SnapshotResetXmin(void)
 										pairingheap_first(&RegisteredSnapshots));
 
 	if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
+		// no need to change indexIgnoreKilledTuples here because xmin restriction is relaxed
 		MyProc->xmin = minSnapshot->xmin;
 }
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 553d364e2d..97c97c13c2 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -165,8 +165,9 @@ typedef struct GISTScanOpaqueData
 	IndexOrderByDistance *distances;	/* output area for gistindex_keytest */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
-	OffsetNumber *killedItems;	/* offset numbers of killed items */
-	int			numKilled;		/* number of currently stored items */
+	OffsetNumber *killedItems;			  /* offset numbers of killed items */
+	TransactionId killedLatestRemovedXid; /* latest removed xid of all killed items */
+	int			  numKilled;			  /* number of currently stored items */
 	BlockNumber curBlkno;		/* current number of block */
 	GistNSN		curPageLSN;		/* pos in the WAL stream when page was read */
 
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index 1cce865be2..a3fc82192e 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -177,8 +177,9 @@ typedef struct HashScanOpaqueData
 	 */
 	bool		hashso_buc_split;
 	/* info about killed items if any (killedItems is NULL if never used) */
-	int		   *killedItems;	/* currPos.items indexes of killed items */
-	int			numKilled;		/* number of currently stored items */
+	int			 *killedItems;			  /* currPos.items indexes of killed items */
+	TransactionId killedLatestRemovedXid; /* latest removed xid of all killed items */
+	int			  numKilled;			  /* number of currently stored items */
 
 	/*
 	 * Identify all the matching items on a page and save them in
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index d96a47b1ce..7ffaac53ec 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -126,7 +126,7 @@ extern bool heap_fetch(Relation relation, Snapshot snapshot,
 					   HeapTuple tuple, Buffer *userbuf);
 extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
 								   Buffer buffer, Snapshot snapshot, HeapTuple heapTuple,
-								   bool *all_dead, bool first_call);
+								   IndexHintBitsData *indexHintBitsData, bool first_call);
 
 extern void heap_get_latest_tid(TableScanDesc scan, ItemPointer tid);
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 178d49710a..b49c3b4dc7 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -387,6 +387,8 @@ typedef struct xl_heap_rewrite_mapping
 
 extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 												   TransactionId *latestRemovedXid);
+extern void IndexHintBitAdvanceLatestRemovedXid(TransactionId killedTupleRemovedXid,
+												TransactionId *latestRemovedXid);
 
 extern void heap_redo(XLogReaderState *record);
 extern void heap_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index cad4f2bdeb..10257821fa 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -920,8 +920,9 @@ typedef struct BTScanOpaqueData
 	MemoryContext arrayContext; /* scan-lifespan context for array data */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
-	int		   *killedItems;	/* currPos.items indexes of killed items */
-	int			numKilled;		/* number of currently stored items */
+	int				*killedItems;			/* currPos.items indexes of killed items */
+	TransactionId	 killedLatestRemovedXid;/* latest removed xid of all killed items */
+	int				 numKilled;				/* number of currently stored items */
 
 	/*
 	 * If we are doing an index-only scan, these are the tuple storage
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 005f3fdd2b..82383f02f3 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -122,10 +122,9 @@ typedef struct IndexScanDescData
 	bool		xs_temp_snap;	/* unregister snapshot at scan end? */
 
 	/* signaling to index AM about killing index tuples */
-	bool		kill_prior_tuple;	/* last-returned tuple is dead */
-	bool		ignore_killed_tuples;	/* do not return killed entries */
-	bool		xactStartedInRecovery;	/* prevents killing/seeing killed
-										 * tuples */
+	bool			kill_prior_tuple;		 /* last-returned tuple is dead */
+	TransactionId	prior_tuple_removed_xid; /* removed fix for dead tuple */
+	bool			ignore_killed_tuples;	 /* do not return killed entries */
 
 	/* index access method's private state */
 	void	   *opaque;			/* access-method-specific info */
@@ -185,4 +184,12 @@ typedef struct SysScanDescData
 	struct TupleTableSlot *slot;
 }			SysScanDescData;
 
+/* Struct for data about visibility of tuple */
+typedef struct IndexHintBitsData
+{
+	bool			all_dead;			/* guaranteed not visible for all backends */
+	TransactionId	latest_removed_xid;	/* latest removed xid if known */
+	XLogRecPtr		page_lsn;			/* lsn of page where dead tuple located */
+}			IndexHintBitsData;
+
 #endif							/* RELSCAN_H */
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 33bffb6815..31659fb1c4 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -394,7 +394,7 @@ typedef struct TableAmRoutine
 	 * needs to be set to true by index_fetch_tuple, signaling to the caller
 	 * that index_fetch_tuple should be called again for the same tid.
 	 *
-	 * *all_dead, if all_dead is not NULL, should be set to true by
+	 * *indexHintBitsData, if value is not NULL, should be filled by
 	 * index_fetch_tuple iff it is guaranteed that no backend needs to see
 	 * that tuple. Index AMs can use that to avoid returning that tid in
 	 * future searches.
@@ -403,7 +403,8 @@ typedef struct TableAmRoutine
 									  ItemPointer tid,
 									  Snapshot snapshot,
 									  TupleTableSlot *slot,
-									  bool *call_again, bool *all_dead);
+									  bool *call_again,
+									  IndexHintBitsData *indexHintBitsData);
 
 
 	/* ------------------------------------------------------------------------
@@ -1107,7 +1108,7 @@ table_index_fetch_end(struct IndexFetchTableData *scan)
  * will be set to true, signaling that table_index_fetch_tuple() should be called
  * again for the same tid.
  *
- * *all_dead, if all_dead is not NULL, will be set to true by
+ * *index_hint_bits_data, if value is not NULL, will be filled by
  * table_index_fetch_tuple() iff it is guaranteed that no backend needs to see
  * that tuple. Index AMs can use that to avoid returning that tid in future
  * searches.
@@ -1124,7 +1125,8 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 						ItemPointer tid,
 						Snapshot snapshot,
 						TupleTableSlot *slot,
-						bool *call_again, bool *all_dead)
+						bool *call_again, 
+						IndexHintBitsData *index_hint_bits_data)
 {
 	/*
 	 * We don't expect direct calls to table_index_fetch_tuple with valid
@@ -1136,7 +1138,7 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 
 	return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
 													slot, call_again,
-													all_dead);
+													index_hint_bits_data);
 }
 
 /*
@@ -1148,7 +1150,7 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 extern bool table_index_fetch_tuple_check(Relation rel,
 										  ItemPointer tid,
 										  Snapshot snapshot,
-										  bool *all_dead);
+										  IndexHintBitsData *indexHintBitsData);
 
 
 /* ------------------------------------------------------------------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 724068cf87..ac649703cd 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -992,6 +992,7 @@ typedef enum
 	WAIT_EVENT_PROC_SIGNAL_BARRIER,
 	WAIT_EVENT_PROMOTE,
 	WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
+	WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS,
 	WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
 	WAIT_EVENT_RECOVERY_PAUSE,
 	WAIT_EVENT_REPLICATION_ORIGIN_DROP,
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4313f516d3..0371223c1e 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -156,6 +156,12 @@ typedef struct
 	 * store semantics, so use sig_atomic_t.
 	 */
 	sig_atomic_t force_reply;	/* used as a bool */
+
+	/* If sender has received our xmin. */
+	sig_atomic_t sender_has_standby_xmin;
+
+	/* Is senders feedback propagated through cascading replication chain up to the primary. */
+	sig_atomic_t sender_propagates_feedback_to_primary;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index fb00fda6a7..7cb4a92f9d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -61,6 +61,8 @@ struct WritebackContext;
 /* forward declared, to avoid including smgr.h here */
 struct SMgrRelationData;
 
+struct IndexHintBitsData;
+
 /* in globals.c ... this duplicates miscadmin.h */
 extern PGDLLIMPORT int NBuffers;
 
@@ -222,6 +224,9 @@ extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
 						 ForkNumber *forknum, BlockNumber *blknum);
 
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
+extern void MarkBufferDirtyIndexHint(Buffer buffer, bool buffer_std,
+									 Relation rel, TransactionId latestRemovedXid);
+extern bool IsMarkBufferDirtyIndexHintAllowed(struct IndexHintBitsData	*indexHintBitsData);
 
 extern void UnlockBuffers(void);
 extern void LockBuffer(Buffer buffer, int mode);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 683ab64f76..0a72160b61 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -165,6 +165,11 @@ struct PGPROC
 	 * though not required. Accessed without lock, if needed.
 	 */
 	bool		recoveryConflictPending;
+	/*
+	*  Flag allowing to read\set LP_DEAD bits in indexes.
+	*  Also used to raise recovery conflicts caused by index hint bits.
+	*/
+	bool		indexIgnoreKilledTuples;
 
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index b01fa52139..3b922f3fcb 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -70,7 +70,8 @@ extern bool IsBackendPid(int pid);
 extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
 												   bool excludeXmin0, bool allDbs, int excludeVacuum,
 												   int *nvxids);
-extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid);
+extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+													   bool onlyIndexIgnoreKilledTuples);
 extern pid_t CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode);
 extern pid_t SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
 									  bool conflictPending);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 94d33851d0..ed984082bf 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -19,6 +19,7 @@
 #include "storage/procsignal.h"
 #include "storage/relfilenode.h"
 #include "storage/standbydefs.h"
+#include "utils/relcache.h"
 
 /* User-settable GUC parameters */
 extern int	vacuum_defer_cleanup_age;
@@ -31,6 +32,9 @@ extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
 												RelFileNode node);
+extern void ResolveIndexHintBitsRecoveryConflictWithSnapshot(
+												TransactionId latestRemovedXid,
+												RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
@@ -93,4 +97,8 @@ extern XLogRecPtr LogStandbySnapshot(void);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 									bool relcacheInitFileInval);
 
+extern void StandByShmemInit(void);
+extern void LogIndexHintBitsHorizonIfNeeded(Relation rel,
+											TransactionId latestRemovedXid);
+
 #endif							/* STANDBY_H */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index d99e6f40c6..127de2e9eb 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -31,9 +31,10 @@ extern void standby_desc_invalidations(StringInfo buf,
 /*
  * XLOG message types
  */
-#define XLOG_STANDBY_LOCK			0x00
-#define XLOG_RUNNING_XACTS			0x10
-#define XLOG_INVALIDATIONS			0x20
+#define XLOG_STANDBY_LOCK				0x00
+#define XLOG_RUNNING_XACTS				0x10
+#define XLOG_INVALIDATIONS				0x20
+#define XLOG_INDEX_HINT_BITS_HORIZON	0x30
 
 typedef struct xl_standby_locks
 {
@@ -71,4 +72,10 @@ typedef struct xl_invalidations
 
 #define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
 
+typedef struct xl_index_hint_bits_horizon
+{
+	RelFileNode		rnode;
+	TransactionId	latestRemovedXid;
+} xl_index_hint_bits_horizon;
+
 #endif							/* STANDBYDEFS_H */

Reply via email to