. rename variables
     . cur_buffer -> dst_buffer
     . ToPage -> dst_page
     . cur_page -> dst_vacpage
   . move variable declarations into block where variable is used
   . various Asserts instead of elog(ERROR, ...)
   . extract functionality from repair_frag() into new routines
     . move_chain_tuple()
     . move_plain_tuple()
     . update_hint_bits()
   . create type ExecContext
   . add comments

This patch does not intend to change any behaviour.  It passes make
check, make installcheck and some manual tests.  It might be hard to
review, because some lines are affected by more than one change.  If
it's too much to swallow at once, I can provide it in smaller chunks ...

Servus
 Manfred
diff -Ncr ../base/src/backend/commands/vacuum.c src/backend/commands/vacuum.c
*** ../base/src/backend/commands/vacuum.c       Mon May 31 21:24:05 2004
--- src/backend/commands/vacuum.c       Wed Jun  2 21:46:59 2004
***************
*** 99,104 ****
--- 99,162 ----
        VTupleLink      vtlinks;
  } VRelStats;
  
+ /*----------------------------------------------------------------------
+  * ExecContext:
+  *
+  * As these variables always appear together, we put them into one struct
+  * and pull initialization and cleanup into separate routines.
+  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
+  * accurately:  It is *used* only in move_xxx_tuple(), but because this
+  * routine is called many times, we initialize the struct just once in
+  * repair_frag() and pass it on to move_xxx_tuple().
+  */
+ typedef struct ExecContextData
+ {
+       ResultRelInfo *resultRelInfo;
+       EState     *estate;
+       TupleTable      tupleTable;
+       TupleTableSlot *slot;
+ } ExecContextData;
+ typedef ExecContextData *ExecContext;
+ 
+ static void
+ ExecContext_Init(ExecContext ec, Relation rel)
+ {
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+ 
+       /*
+        * We need a ResultRelInfo and an EState so we can use the regular
+        * executor's index-entry-making machinery.
+        */
+       ec->estate = CreateExecutorState();
+ 
+       ec->resultRelInfo = makeNode(ResultRelInfo);
+       ec->resultRelInfo->ri_RangeTableIndex = 1;              /* dummy */
+       ec->resultRelInfo->ri_RelationDesc = rel;
+       ec->resultRelInfo->ri_TrigDesc = NULL;  /* we don't fire triggers */
+ 
+       ExecOpenIndices(ec->resultRelInfo);
+ 
+       ec->estate->es_result_relations = ec->resultRelInfo;
+       ec->estate->es_num_result_relations = 1;
+       ec->estate->es_result_relation_info = ec->resultRelInfo;
+ 
+       /* Set up a dummy tuple table too */
+       ec->tupleTable = ExecCreateTupleTable(1);
+       ec->slot = ExecAllocTableSlot(ec->tupleTable);
+       ExecSetSlotDescriptor(ec->slot, tupdesc, false);
+ }
+ 
+ static void
+ ExecContext_Finish(ExecContext ec)
+ {
+       ExecDropTupleTable(ec->tupleTable, true);
+       ExecCloseIndices(ec->resultRelInfo);
+       FreeExecutorState(ec->estate);
+ }
+ /*
+  * End of ExecContext Implementation
+  *----------------------------------------------------------------------
+  */
  
  static MemoryContext vac_context = NULL;
  
***************
*** 122,127 ****
--- 180,196 ----
  static void repair_frag(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacuum_pages, VacPageList fraged_pages,
                        int nindexes, Relation *Irel);
+ static void move_chain_tuple(Relation rel,
+                                        Buffer old_buf, Page old_page, HeapTuple 
old_tup,
+                                        Buffer dst_buf, Page dst_page, VacPage 
dst_vacpage,
+                                        ExecContext ec, ItemPointer ctid, bool 
cleanVpd);
+ static void move_plain_tuple(Relation rel,
+                                        Buffer old_buf, Page old_page, HeapTuple 
old_tup,
+                                        Buffer dst_buf, Page dst_page, VacPage 
dst_vacpage,
+                                        ExecContext ec);
+ static void update_hint_bits(Relation rel, VacPageList fraged_pages,
+                                       int num_fraged_pages, BlockNumber 
last_move_dest_block,
+                                       int num_moved);
  static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacpagelist);
  static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
***************
*** 675,681 ****
  static void
  vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  {
!       TransactionId myXID;
        Relation        relation;
        HeapScanDesc scan;
        HeapTuple       tuple;
--- 744,750 ----
  static void
  vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  {
!       TransactionId myXID = GetCurrentTransactionId();
        Relation        relation;
        HeapScanDesc scan;
        HeapTuple       tuple;
***************
*** 683,689 ****
        bool            vacuumAlreadyWrapped = false;
        bool            frozenAlreadyWrapped = false;
  
-       myXID = GetCurrentTransactionId();
  
        relation = heap_openr(DatabaseRelationName, AccessShareLock);
  
--- 752,757 ----
***************
*** 1059,1075 ****
  {
        BlockNumber nblocks,
                                blkno;
-       ItemId          itemid;
-       Buffer          buf;
        HeapTupleData tuple;
-       OffsetNumber offnum,
-                               maxoff;
-       bool            pgchanged,
-                               tupgone,
-                               notup;
        char       *relname;
!       VacPage         vacpage,
!                               vacpagecopy;
        BlockNumber empty_pages,
                                empty_end_pages;
        double          num_tuples,
--- 1127,1135 ----
  {
        BlockNumber nblocks,
                                blkno;
        HeapTupleData tuple;
        char       *relname;
!       VacPage         vacpage;
        BlockNumber empty_pages,
                                empty_end_pages;
        double          num_tuples,
***************
*** 1080,1086 ****
                                usable_free_space;
        Size            min_tlen = MaxTupleSize;
        Size            max_tlen = 0;
-       int                     i;
        bool            do_shrinking = true;
        VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
        int                     num_vtlinks = 0;
--- 1140,1145 ----
***************
*** 1113,1118 ****
--- 1172,1182 ----
                                        tempPage = NULL;
                bool            do_reap,
                                        do_frag;
+               Buffer          buf;
+               OffsetNumber offnum,
+                                       maxoff;
+               bool            pgchanged,
+                                       notup;
  
                vacuum_delay_point();
  
***************
*** 1125,1130 ****
--- 1189,1196 ----
  
                if (PageIsNew(page))
                {
+                       VacPage vacpagecopy;
+ 
                        ereport(WARNING,
                        (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
                                        relname, blkno)));
***************
*** 1142,1147 ****
--- 1208,1215 ----
  
                if (PageIsEmpty(page))
                {
+                       VacPage vacpagecopy;
+ 
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) 
page)->pd_lower;
                        free_space += vacpage->free;
                        empty_pages++;
***************
*** 1161,1168 ****
                         offnum = OffsetNumberNext(offnum))
                {
                        uint16          sv_infomask;
! 
!                       itemid = PageGetItemId(page, offnum);
  
                        /*
                         * Collect un-used items too - it's possible to have indexes
--- 1229,1236 ----
                         offnum = OffsetNumberNext(offnum))
                {
                        uint16          sv_infomask;
!                       ItemId          itemid = PageGetItemId(page, offnum);
!                       bool            tupgone = false;
  
                        /*
                         * Collect un-used items too - it's possible to have indexes
***************
*** 1180,1186 ****
                        tuple.t_len = ItemIdGetLength(itemid);
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
  
-                       tupgone = false;
                        sv_infomask = tuple.t_data->t_infomask;
  
                        switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
--- 1248,1253 ----
***************
*** 1269,1275 ****
                                        do_shrinking = false;
                                        break;
                                default:
!                                       elog(ERROR, "unexpected 
HeapTupleSatisfiesVacuum result");
                                        break;
                        }
  
--- 1336,1343 ----
                                        do_shrinking = false;
                                        break;
                                default:
!                                       /* unexpected HeapTupleSatisfiesVacuum result 
*/
!                                       Assert(false);
                                        break;
                        }
  
***************
*** 1344,1350 ****
  
                if (do_reap || do_frag)
                {
!                       vacpagecopy = copy_vac_page(vacpage);
                        if (do_reap)
                                vpage_insert(vacuum_pages, vacpagecopy);
                        if (do_frag)
--- 1412,1418 ----
  
                if (do_reap || do_frag)
                {
!                       VacPage vacpagecopy = copy_vac_page(vacpage);
                        if (do_reap)
                                vpage_insert(vacuum_pages, vacpagecopy);
                        if (do_frag)
***************
*** 1390,1395 ****
--- 1458,1465 ----
         */
        if (do_shrinking)
        {
+               int                     i;
+ 
                Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
                fraged_pages->num_pages -= empty_end_pages;
                usable_free_space = 0;
***************
*** 1453,1528 ****
                        VacPageList vacuum_pages, VacPageList fraged_pages,
                        int nindexes, Relation *Irel)
  {
!       TransactionId myXID;
!       CommandId       myCID;
!       Buffer          buf,
!                               cur_buffer;
        BlockNumber nblocks,
                                blkno;
        BlockNumber last_move_dest_block = 0,
                                last_vacuum_block;
!       Page            page,
!                               ToPage = NULL;
!       OffsetNumber offnum,
!                               maxoff,
!                               newoff,
!                               max_offset;
!       ItemId          itemid,
!                               newitemid;
!       HeapTupleData tuple,
!                               newtup;
!       TupleDesc       tupdesc;
!       ResultRelInfo *resultRelInfo;
!       EState     *estate;
!       TupleTable      tupleTable;
!       TupleTableSlot *slot;
        VacPageListData Nvacpagelist;
!       VacPage         cur_page = NULL,
                                last_vacuum_page,
                                vacpage,
                           *curpage;
-       int                     cur_item = 0;
        int                     i;
!       Size            tuple_len;
!       int                     num_moved,
                                num_fraged_pages,
                                vacuumed_pages;
!       int                     checked_moved,
!                               num_tuples,
!                               keep_tuples = 0;
!       bool            isempty,
!                               dowrite,
!                               chain_tuple_moved;
        VacRUsage       ru0;
  
        vac_init_rusage(&ru0);
  
!       myXID = GetCurrentTransactionId();
!       myCID = GetCurrentCommandId();
! 
!       tupdesc = RelationGetDescr(onerel);
! 
!       /*
!        * We need a ResultRelInfo and an EState so we can use the regular
!        * executor's index-entry-making machinery.
!        */
!       estate = CreateExecutorState();
! 
!       resultRelInfo = makeNode(ResultRelInfo);
!       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
!       resultRelInfo->ri_RelationDesc = onerel;
!       resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
! 
!       ExecOpenIndices(resultRelInfo);
! 
!       estate->es_result_relations = resultRelInfo;
!       estate->es_num_result_relations = 1;
!       estate->es_result_relation_info = resultRelInfo;
! 
!       /* Set up a dummy tuple table too */
!       tupleTable = ExecCreateTupleTable(1);
!       slot = ExecAllocTableSlot(tupleTable);
!       ExecSetSlotDescriptor(slot, tupdesc, false);
  
        Nvacpagelist.num_pages = 0;
        num_fraged_pages = fraged_pages->num_pages;
--- 1523,1551 ----
                        VacPageList vacuum_pages, VacPageList fraged_pages,
                        int nindexes, Relation *Irel)
  {
!       TransactionId myXID = GetCurrentTransactionId();
!       Buffer          dst_buffer = InvalidBuffer;
        BlockNumber nblocks,
                                blkno;
        BlockNumber last_move_dest_block = 0,
                                last_vacuum_block;
!       Page            dst_page = NULL;
!       ExecContextData ec;
        VacPageListData Nvacpagelist;
!       VacPage         dst_vacpage = NULL,
                                last_vacuum_page,
                                vacpage,
                           *curpage;
        int                     i;
!       int                     num_moved = 0,
                                num_fraged_pages,
                                vacuumed_pages;
!       int                     keep_tuples = 0;
        VacRUsage       ru0;
  
        vac_init_rusage(&ru0);
  
!       ExecContext_Init(&ec, onerel);
  
        Nvacpagelist.num_pages = 0;
        num_fraged_pages = fraged_pages->num_pages;
***************
*** 1539,1546 ****
                last_vacuum_page = NULL;
                last_vacuum_block = InvalidBlockNumber;
        }
-       cur_buffer = InvalidBuffer;
-       num_moved = 0;
  
        vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * 
sizeof(OffsetNumber));
        vacpage->offsets_used = vacpage->offsets_free = 0;
--- 1562,1567 ----
***************
*** 1560,1565 ****
--- 1581,1594 ----
                 blkno > last_move_dest_block;
                 blkno--)
        {
+               Buffer                  buf;
+               Page                    page;
+               OffsetNumber    offnum,
+                                               maxoff;
+               bool                    isempty,
+                                               dowrite,
+                                               chain_tuple_moved;
+ 
                vacuum_delay_point();
  
                /*
***************
*** 1635,1641 ****
                         offnum <= maxoff;
                         offnum = OffsetNumberNext(offnum))
                {
!                       itemid = PageGetItemId(page, offnum);
  
                        if (!ItemIdIsUsed(itemid))
                                continue;
--- 1664,1672 ----
                         offnum <= maxoff;
                         offnum = OffsetNumberNext(offnum))
                {
!                       Size                    tuple_len;
!                       HeapTupleData   tuple;
!                       ItemId                  itemid = PageGetItemId(page, offnum);
  
                        if (!ItemIdIsUsed(itemid))
                                continue;
***************
*** 1645,1689 ****
                        tuple_len = tuple.t_len = ItemIdGetLength(itemid);
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
  
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
!                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
!                                       elog(ERROR, "HEAP_MOVED_IN was not expected");
  
                                /*
                                 * If this (chain) tuple is moved by me already then I
                                 * have to check is it in vacpage or not - i.e. is it
                                 * moved while cleaning this page or some previous one.
                                 */
!                               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
!                               {
!                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != 
myXID)
!                                               elog(ERROR, "invalid XVAC in tuple 
header");
!                                       if (keep_tuples == 0)
!                                               continue;
!                                       if (chain_tuple_moved)          /* some chains 
was moved
!                                                                                      
          * while */
!                                       {                       /* cleaning this page 
*/
!                                               Assert(vacpage->offsets_free > 0);
!                                               for (i = 0; i < vacpage->offsets_free; 
i++)
!                                               {
!                                                       if (vacpage->offsets[i] == 
offnum)
!                                                               break;
!                                               }
!                                               if (i >= vacpage->offsets_free) /* not 
found */
!                                               {
!                                                       
vacpage->offsets[vacpage->offsets_free++] = offnum;
!                                                       keep_tuples--;
!                                               }
                                        }
!                                       else
                                        {
                                                
vacpage->offsets[vacpage->offsets_free++] = offnum;
                                                keep_tuples--;
                                        }
-                                       continue;
                                }
!                               elog(ERROR, "HEAP_MOVED_OFF was expected");
                        }
  
                        /*
--- 1676,1746 ----
                        tuple_len = tuple.t_len = ItemIdGetLength(itemid);
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
  
+                       /*
+                        * VACUUM FULL has an exclusive lock on the relation.  So
+                        * normally no other transaction can have pending INSERTs or
+                        * DELETEs in this relation.  A tuple is either
+                        *   (a) a tuple in a system catalog, inserted or deleted by
+                        *       a not yet committed transaction or
+                        *   (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
+                        *   (c) inserted by a committed xact (XMIN_COMMITTED) or
+                        *   (d) moved by the currently running VACUUM.
+                        * In case (a) we wouldn't be in repair_frag() at all.
+                        * In case (b) we cannot be here, because scan_heap() has
+                        * already marked the item as unused, see continue above.
+                        * Case (c) is what normally is to be expected.
+                        * Case (d) is only possible, if a whole tuple chain has been
+                        * moved while processing this or a higher numbered block.
+                        */
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
!                               /*
!                                * There cannot be another concurrently running 
VACUUM.  If
!                                * the tuple had been moved in by a previous VACUUM, 
the
!                                * visibility check would have set XMIN_COMMITTED.  If 
the
!                                * tuple had been moved in by the currently running 
VACUUM,
!                                * the loop would have been terminated.  We had
!                                * elog(ERROR, ...) here, but as we are testing for a
!                                * can't-happen condition, Assert() seems more 
appropriate.
!                                */
!                               Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
  
                                /*
                                 * If this (chain) tuple is moved by me already then I
                                 * have to check is it in vacpage or not - i.e. is it
                                 * moved while cleaning this page or some previous one.
                                 */
!                               Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
!                               /*
!                                * MOVED_OFF by another VACUUM would have caused the
!                                * visibility check to set XMIN_COMMITTED or 
XMIN_INVALID.
!                                */
!                               Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
! 
!                               /* Can't we Assert(keep_tuples > 0) here? */
!                               if (keep_tuples == 0)
!                                       continue;
!                               if (chain_tuple_moved)          /* some chains was 
moved
!                                                                                      
  * while */
!                               {                       /* cleaning this page */
!                                       Assert(vacpage->offsets_free > 0);
!                                       for (i = 0; i < vacpage->offsets_free; i++)
!                                       {
!                                               if (vacpage->offsets[i] == offnum)
!                                                       break;
                                        }
!                                       if (i >= vacpage->offsets_free) /* not found */
                                        {
                                                
vacpage->offsets[vacpage->offsets_free++] = offnum;
                                                keep_tuples--;
                                        }
                                }
!                               else
!                               {
!                                       vacpage->offsets[vacpage->offsets_free++] = 
offnum;
!                                       keep_tuples--;
!                               }
!                               continue;
                        }
  
                        /*
***************
*** 1716,1723 ****
                                Buffer          Cbuf = buf;
                                bool            freeCbuf = false;
                                bool            chain_move_failed = false;
-                               Page            Cpage;
-                               ItemId          Citemid;
                                ItemPointerData Ctid;
                                HeapTupleData tp = tuple;
                                Size            tlen = tuple_len;
--- 1773,1778 ----
***************
*** 1728,1737 ****
                                int                     to_item = 0;
                                int                     ti;
  
!                               if (cur_buffer != InvalidBuffer)
                                {
!                                       WriteBuffer(cur_buffer);
!                                       cur_buffer = InvalidBuffer;
                                }
  
                                /* Quick exit if we have no vtlinks to search in */
--- 1783,1792 ----
                                int                     to_item = 0;
                                int                     ti;
  
!                               if (dst_buffer != InvalidBuffer)
                                {
!                                       WriteBuffer(dst_buffer);
!                                       dst_buffer = InvalidBuffer;
                                }
  
                                /* Quick exit if we have no vtlinks to search in */
***************
*** 1754,1759 ****
--- 1809,1818 ----
                                           !(ItemPointerEquals(&(tp.t_self),
                                                                                   
&(tp.t_data->t_ctid))))
                                {
+                                       Page            Cpage;
+                                       ItemId          Citemid;
+                                       ItemPointerData Ctid;
+ 
                                        Ctid = tp.t_data->t_ctid;
                                        if (freeCbuf)
                                                ReleaseBuffer(Cbuf);
***************
*** 1929,1940 ****
                                }
  
                                /*
!                                * Okay, move the whle tuple chain
                                 */
                                ItemPointerSetInvalid(&Ctid);
                                for (ti = 0; ti < num_vtmove; ti++)
                                {
                                        VacPage         destvacpage = 
vtmove[ti].vacpage;
  
                                        /* Get page to move from */
                                        tuple.t_self = vtmove[ti].tid;
--- 1988,2001 ----
                                }
  
                                /*
!                                * Okay, move the whole tuple chain
                                 */
                                ItemPointerSetInvalid(&Ctid);
                                for (ti = 0; ti < num_vtmove; ti++)
                                {
                                        VacPage         destvacpage = 
vtmove[ti].vacpage;
+                                       Page            Cpage;
+                                       ItemId          Citemid;
  
                                        /* Get page to move from */
                                        tuple.t_self = vtmove[ti].tid;
***************
*** 1942,1954 ****
                                                         
ItemPointerGetBlockNumber(&(tuple.t_self)));
  
                                        /* Get page to move to */
!                                       cur_buffer = ReadBuffer(onerel, 
destvacpage->blkno);
  
!                                       LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
!                                       if (cur_buffer != Cbuf)
                                                LockBuffer(Cbuf, 
BUFFER_LOCK_EXCLUSIVE);
  
!                                       ToPage = BufferGetPage(cur_buffer);
                                        Cpage = BufferGetPage(Cbuf);
  
                                        Citemid = PageGetItemId(Cpage,
--- 2003,2015 ----
                                                         
ItemPointerGetBlockNumber(&(tuple.t_self)));
  
                                        /* Get page to move to */
!                                       dst_buffer = ReadBuffer(onerel, 
destvacpage->blkno);
  
!                                       LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
!                                       if (dst_buffer != Cbuf)
                                                LockBuffer(Cbuf, 
BUFFER_LOCK_EXCLUSIVE);
  
!                                       dst_page = BufferGetPage(dst_buffer);
                                        Cpage = BufferGetPage(Cbuf);
  
                                        Citemid = PageGetItemId(Cpage,
***************
*** 1961,2081 ****
                                         * make a copy of the source tuple, and then 
mark the
                                         * source tuple MOVED_OFF.
                                         */
!                                       heap_copytuple_with_tuple(&tuple, &newtup);
! 
!                                       /*
!                                        * register invalidation of source tuple in 
catcaches.
!                                        */
!                                       CacheInvalidateHeapTuple(onerel, &tuple);
! 
!                                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
!                                       START_CRIT_SECTION();
! 
!                                       tuple.t_data->t_infomask &= 
~(HEAP_XMIN_COMMITTED |
!                                                                                      
           HEAP_XMIN_INVALID |
!                                                                                      
           HEAP_MOVED_IN);
!                                       tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
!                                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
! 
!                                       /*
!                                        * If this page was not used before - clean it.
!                                        *
!                                        * NOTE: a nasty bug used to lurk here.  It is 
possible
!                                        * for the source and destination pages to be 
the same
!                                        * (since this tuple-chain member can be on a 
page
!                                        * lower than the one we're currently 
processing in
!                                        * the outer loop).  If that's true, then after
!                                        * vacuum_page() the source tuple will have 
been
!                                        * moved, and tuple.t_data will be pointing at
!                                        * garbage.  Therefore we must do everything 
that uses
!                                        * tuple.t_data BEFORE this step!!
!                                        *
!                                        * This path is different from the other 
callers of
!                                        * vacuum_page, because we have already 
incremented
!                                        * the vacpage's offsets_used field to account 
for the
!                                        * tuple(s) we expect to move onto the page. 
Therefore
!                                        * vacuum_page's check for offsets_used == 0 
is wrong.
!                                        * But since that's a good debugging check for 
all
!                                        * other callers, we work around it here 
rather than
!                                        * remove it.
!                                        */
!                                       if (!PageIsEmpty(ToPage) && 
vtmove[ti].cleanVpd)
!                                       {
!                                               int                     
sv_offsets_used = destvacpage->offsets_used;
! 
!                                               destvacpage->offsets_used = 0;
!                                               vacuum_page(onerel, cur_buffer, 
destvacpage);
!                                               destvacpage->offsets_used = 
sv_offsets_used;
!                                       }
! 
!                                       /*
!                                        * Update the state of the copied tuple, and 
store it
!                                        * on the destination page.
!                                        */
!                                       newtup.t_data->t_infomask &= 
~(HEAP_XMIN_COMMITTED |
!                                                                                      
            HEAP_XMIN_INVALID |
!                                                                                      
            HEAP_MOVED_OFF);
!                                       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!                                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
!                                       newoff = PageAddItem(ToPage,
!                                                                                
(Item) newtup.t_data,
!                                                                                
tuple_len,
!                                                                                
InvalidOffsetNumber,
!                                                                                
LP_USED);
!                                       if (newoff == InvalidOffsetNumber)
!                                       {
!                                               elog(PANIC, "failed to add item with 
len = %lu to page %u while moving tuple chain",
!                                                 (unsigned long) tuple_len, 
destvacpage->blkno);
!                                       }
!                                       newitemid = PageGetItemId(ToPage, newoff);
!                                       pfree(newtup.t_data);
!                                       newtup.t_datamcxt = NULL;
!                                       newtup.t_data = (HeapTupleHeader) 
PageGetItem(ToPage, newitemid);
!                                       ItemPointerSet(&(newtup.t_self), 
destvacpage->blkno, newoff);
! 
!                                       /* XLOG stuff */
!                                       if (!onerel->rd_istemp)
!                                       {
!                                               XLogRecPtr      recptr =
!                                               log_heap_move(onerel, Cbuf, 
tuple.t_self,
!                                                                         cur_buffer, 
&newtup);
! 
!                                               if (Cbuf != cur_buffer)
!                                               {
!                                                       PageSetLSN(Cpage, recptr);
!                                                       PageSetSUI(Cpage, 
ThisStartUpID);
!                                               }
!                                               PageSetLSN(ToPage, recptr);
!                                               PageSetSUI(ToPage, ThisStartUpID);
!                                       }
!                                       else
!                                       {
!                                               /*
!                                                * No XLOG record, but still need to 
flag that XID
!                                                * exists on disk
!                                                */
!                                               MyXactMadeTempRelUpdate = true;
!                                       }
! 
!                                       END_CRIT_SECTION();
  
                                        if (destvacpage->blkno > last_move_dest_block)
                                                last_move_dest_block = 
destvacpage->blkno;
  
                                        /*
-                                        * Set new tuple's t_ctid pointing to itself 
for last
-                                        * tuple in chain, and to next tuple in chain
-                                        * otherwise.
-                                        */
-                                       if (!ItemPointerIsValid(&Ctid))
-                                               newtup.t_data->t_ctid = newtup.t_self;
-                                       else
-                                               newtup.t_data->t_ctid = Ctid;
-                                       Ctid = newtup.t_self;
- 
-                                       num_moved++;
- 
-                                       /*
                                         * Remember that we moved tuple from the 
current page
                                         * (corresponding index tuple will be cleaned).
                                         */
--- 2022,2036 ----
                                         * make a copy of the source tuple, and then 
mark the
                                         * source tuple MOVED_OFF.
                                         */
!                                       move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
!                                                                        dst_buffer, 
dst_page, destvacpage,
!                                                                        &ec, &Ctid, 
vtmove[ti].cleanVpd);
  
+                                       num_moved++;
                                        if (destvacpage->blkno > last_move_dest_block)
                                                last_move_dest_block = 
destvacpage->blkno;
  
                                        /*
                                         * Remember that we moved tuple from the 
current page
                                         * (corresponding index tuple will be cleaned).
                                         */
***************
*** 2085,2107 ****
                                        else
                                                keep_tuples++;
  
!                                       LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
!                                       if (cur_buffer != Cbuf)
!                                               LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
! 
!                                       /* Create index entries for the moved tuple */
!                                       if (resultRelInfo->ri_NumIndices > 0)
!                                       {
!                                               ExecStoreTuple(&newtup, slot, 
InvalidBuffer, false);
!                                               ExecInsertIndexTuples(slot, 
&(newtup.t_self),
!                                                                                      
   estate, true);
!                                       }
! 
!                                       WriteBuffer(cur_buffer);
                                        WriteBuffer(Cbuf);
                                }                               /* end of 
move-the-tuple-chain loop */
  
!                               cur_buffer = InvalidBuffer;
                                pfree(vtmove);
                                chain_tuple_moved = true;
  
--- 2040,2050 ----
                                        else
                                                keep_tuples++;
  
!                                       WriteBuffer(dst_buffer);
                                        WriteBuffer(Cbuf);
                                }                               /* end of 
move-the-tuple-chain loop */
  
!                               dst_buffer = InvalidBuffer;
                                pfree(vtmove);
                                chain_tuple_moved = true;
  
***************
*** 2110,2122 ****
                        }                                       /* end of 
is-tuple-in-chain test */
  
                        /* try to find new page for this tuple */
!                       if (cur_buffer == InvalidBuffer ||
!                               !enough_space(cur_page, tuple_len))
                        {
!                               if (cur_buffer != InvalidBuffer)
                                {
!                                       WriteBuffer(cur_buffer);
!                                       cur_buffer = InvalidBuffer;
                                }
                                for (i = 0; i < num_fraged_pages; i++)
                                {
--- 2053,2065 ----
                        }                                       /* end of 
is-tuple-in-chain test */
  
                        /* try to find new page for this tuple */
!                       if (dst_buffer == InvalidBuffer ||
!                               !enough_space(dst_vacpage, tuple_len))
                        {
!                               if (dst_buffer != InvalidBuffer)
                                {
!                                       WriteBuffer(dst_buffer);
!                                       dst_buffer = InvalidBuffer;
                                }
                                for (i = 0; i < num_fraged_pages; i++)
                                {
***************
*** 2125,2234 ****
                                }
                                if (i == num_fraged_pages)
                                        break;          /* can't move item anywhere */
!                               cur_item = i;
!                               cur_page = fraged_pages->pagedesc[cur_item];
!                               cur_buffer = ReadBuffer(onerel, cur_page->blkno);
!                               LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
!                               ToPage = BufferGetPage(cur_buffer);
                                /* if this page was not used before - clean it */
!                               if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 
0)
!                                       vacuum_page(onerel, cur_buffer, cur_page);
                        }
                        else
!                               LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
  
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  
!                       /* copy tuple */
!                       heap_copytuple_with_tuple(&tuple, &newtup);
! 
!                       /*
!                        * register invalidation of source tuple in catcaches.
!                        *
!                        * (Note: we do not need to register the copied tuple, because 
we
!                        * are not changing the tuple contents and so there cannot be
!                        * any need to flush negative catcache entries.)
!                        */
!                       CacheInvalidateHeapTuple(onerel, &tuple);
  
-                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
-                       START_CRIT_SECTION();
  
!                       /*
!                        * Mark new tuple as MOVED_IN by me.
!                        */
!                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                                                  
HEAP_XMIN_INVALID |
!                                                                                  
HEAP_MOVED_OFF);
!                       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
! 
!                       /* add tuple to the page */
!                       newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
!                                                                InvalidOffsetNumber, 
LP_USED);
!                       if (newoff == InvalidOffsetNumber)
!                       {
!                               elog(PANIC, "failed to add item with len = %lu to page 
%u (free space %lu, nusd %u, noff %u)",
!                                        (unsigned long) tuple_len,
!                                        cur_page->blkno, (unsigned long) 
cur_page->free,
!                                        cur_page->offsets_used, 
cur_page->offsets_free);
!                       }
!                       newitemid = PageGetItemId(ToPage, newoff);
!                       pfree(newtup.t_data);
!                       newtup.t_datamcxt = NULL;
!                       newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, 
newitemid);
!                       ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, 
newoff);
!                       newtup.t_self = newtup.t_data->t_ctid;
  
                        /*
!                        * Mark old tuple as MOVED_OFF by me.
                         */
-                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                                                 
HEAP_XMIN_INVALID |
-                                                                                 
HEAP_MOVED_IN);
-                       tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
-                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
- 
-                       /* XLOG stuff */
-                       if (!onerel->rd_istemp)
-                       {
-                               XLogRecPtr      recptr =
-                               log_heap_move(onerel, buf, tuple.t_self,
-                                                         cur_buffer, &newtup);
- 
-                               PageSetLSN(page, recptr);
-                               PageSetSUI(page, ThisStartUpID);
-                               PageSetLSN(ToPage, recptr);
-                               PageSetSUI(ToPage, ThisStartUpID);
-                       }
-                       else
-                       {
-                               /*
-                                * No XLOG record, but still need to flag that XID 
exists
-                                * on disk
-                                */
-                               MyXactMadeTempRelUpdate = true;
-                       }
- 
-                       END_CRIT_SECTION();
- 
-                       cur_page->offsets_used++;
-                       num_moved++;
-                       cur_page->free = ((PageHeader) ToPage)->pd_upper - 
((PageHeader) ToPage)->pd_lower;
-                       if (cur_page->blkno > last_move_dest_block)
-                               last_move_dest_block = cur_page->blkno;
- 
                        vacpage->offsets[vacpage->offsets_free++] = offnum;
- 
-                       LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
-                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
- 
-                       /* insert index' tuples if needed */
-                       if (resultRelInfo->ri_NumIndices > 0)
-                       {
-                               ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
-                               ExecInsertIndexTuples(slot, &(newtup.t_self), estate, 
true);
-                       }
                }                                               /* walk along page */
  
                /*
--- 2068,2099 ----
                                }
                                if (i == num_fraged_pages)
                                        break;          /* can't move item anywhere */
!                               dst_vacpage = fraged_pages->pagedesc[i];
!                               dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
!                               LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
!                               dst_page = BufferGetPage(dst_buffer);
                                /* if this page was not used before - clean it */
!                               if (!PageIsEmpty(dst_page) && 
dst_vacpage->offsets_used == 0)
!                                       vacuum_page(onerel, dst_buffer, dst_vacpage);
                        }
                        else
!                               LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
  
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  
!                       move_plain_tuple(onerel, buf, page, &tuple,
!                                                        dst_buffer, dst_page, 
dst_vacpage, &ec);
  
  
!                       num_moved++;
!                       if (dst_vacpage->blkno > last_move_dest_block)
!                               last_move_dest_block = dst_vacpage->blkno;
  
                        /*
!                        * Remember that we moved tuple from the current page
!                        * (corresponding index tuple will be cleaned).
                         */
                        vacpage->offsets[vacpage->offsets_free++] = offnum;
                }                                               /* walk along page */
  
                /*
***************
*** 2249,2284 ****
                                 off <= maxoff;
                                 off = OffsetNumberNext(off))
                        {
!                               itemid = PageGetItemId(page, off);
                                if (!ItemIdIsUsed(itemid))
                                        continue;
!                               tuple.t_datamcxt = NULL;
!                               tuple.t_data = (HeapTupleHeader) PageGetItem(page, 
itemid);
!                               if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
                                        continue;
!                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
!                                       elog(ERROR, "HEAP_MOVED_IN was not expected");
!                               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                {
!                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != 
myXID)
!                                               elog(ERROR, "invalid XVAC in tuple 
header");
!                                       /* some chains was moved while */
!                                       if (chain_tuple_moved)
!                                       {                       /* cleaning this page 
*/
!                                               Assert(vacpage->offsets_free > 0);
!                                               for (i = 0; i < vacpage->offsets_free; 
i++)
!                                               {
!                                                       if (vacpage->offsets[i] == off)
!                                                               break;
!                                               }
!                                               if (i >= vacpage->offsets_free) /* not 
found */
!                                               {
!                                                       
vacpage->offsets[vacpage->offsets_free++] = off;
!                                                       Assert(keep_tuples > 0);
!                                                       keep_tuples--;
!                                               }
                                        }
!                                       else
                                        {
                                                
vacpage->offsets[vacpage->offsets_free++] = off;
                                                Assert(keep_tuples > 0);
--- 2114,2144 ----
                                 off <= maxoff;
                                 off = OffsetNumberNext(off))
                        {
!                               ItemId  itemid = PageGetItemId(page, off);
!                               HeapTupleHeader htup;
! 
                                if (!ItemIdIsUsed(itemid))
                                        continue;
!                               htup = (HeapTupleHeader) PageGetItem(page, itemid);
!                               if (htup->t_infomask & HEAP_XMIN_COMMITTED)
                                        continue;
!                               /*
!                               ** See comments in the walk-along-page loop above, why 
we
!                               ** have Asserts here instead of if (...) elog(ERROR).
!                               */
!                               Assert(!(htup->t_infomask & HEAP_MOVED_IN));
!                               Assert(htup->t_infomask & HEAP_MOVED_OFF);
!                               Assert(HeapTupleHeaderGetXvac(htup) == myXID);
!                               if (chain_tuple_moved)
                                {
!                                       /* some chains was moved while cleaning this 
page */
!                                       Assert(vacpage->offsets_free > 0);
!                                       for (i = 0; i < vacpage->offsets_free; i++)
!                                       {
!                                               if (vacpage->offsets[i] == off)
!                                                       break;
                                        }
!                                       if (i >= vacpage->offsets_free) /* not found */
                                        {
                                                
vacpage->offsets[vacpage->offsets_free++] = off;
                                                Assert(keep_tuples > 0);
***************
*** 2286,2292 ****
                                        }
                                }
                                else
!                                       elog(ERROR, "HEAP_MOVED_OFF was expected");
                        }
                }
  
--- 2146,2156 ----
                                        }
                                }
                                else
!                               {
!                                       vacpage->offsets[vacpage->offsets_free++] = 
off;
!                                       Assert(keep_tuples > 0);
!                                       keep_tuples--;
!                               }
                        }
                }
  
***************
*** 2312,2321 ****
  
        blkno++;                                        /* new number of blocks */
  
!       if (cur_buffer != InvalidBuffer)
        {
                Assert(num_moved > 0);
!               WriteBuffer(cur_buffer);
        }
  
        if (num_moved > 0)
--- 2176,2185 ----
  
        blkno++;                                        /* new number of blocks */
  
!       if (dst_buffer != InvalidBuffer)
        {
                Assert(num_moved > 0);
!               WriteBuffer(dst_buffer);
        }
  
        if (num_moved > 0)
***************
*** 2348,2353 ****
--- 2212,2220 ----
                Assert((*curpage)->blkno < blkno);
                if ((*curpage)->offsets_used == 0)
                {
+                       Buffer          buf;
+                       Page            page;
+ 
                        /* this page was not used as a move target, so must clean it */
                        buf = ReadBuffer(onerel, (*curpage)->blkno);
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
***************
*** 2363,2424 ****
         * Now scan all the pages that we moved tuples onto and update tuple
         * status bits.  This is not really necessary, but will save time for
         * future transactions examining these tuples.
-        *
-        * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
-        * pages that were move source pages but not move dest pages.  One
-        * also wonders whether it wouldn't be better to skip this step and
-        * let the tuple status updates happen someplace that's not holding an
-        * exclusive lock on the relation.
         */
!       checked_moved = 0;
!       for (i = 0, curpage = fraged_pages->pagedesc;
!                i < num_fraged_pages;
!                i++, curpage++)
!       {
!               vacuum_delay_point();
! 
!               Assert((*curpage)->blkno < blkno);
!               if ((*curpage)->blkno > last_move_dest_block)
!                       break;                          /* no need to scan any further 
*/
!               if ((*curpage)->offsets_used == 0)
!                       continue;                       /* this page was never used as 
a move dest */
!               buf = ReadBuffer(onerel, (*curpage)->blkno);
!               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
!               page = BufferGetPage(buf);
!               num_tuples = 0;
!               max_offset = PageGetMaxOffsetNumber(page);
!               for (newoff = FirstOffsetNumber;
!                        newoff <= max_offset;
!                        newoff = OffsetNumberNext(newoff))
!               {
!                       itemid = PageGetItemId(page, newoff);
!                       if (!ItemIdIsUsed(itemid))
!                               continue;
!                       tuple.t_datamcxt = NULL;
!                       tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
!                       if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
!                       {
!                               if (!(tuple.t_data->t_infomask & HEAP_MOVED))
!                                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was 
expected");
!                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
!                                       elog(ERROR, "invalid XVAC in tuple header");
!                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
!                               {
!                                       tuple.t_data->t_infomask |= 
HEAP_XMIN_COMMITTED;
!                                       tuple.t_data->t_infomask &= ~HEAP_MOVED;
!                                       num_tuples++;
!                               }
!                               else
!                                       tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
!                       }
!               }
!               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
!               WriteBuffer(buf);
!               Assert((*curpage)->offsets_used == num_tuples);
!               checked_moved += num_tuples;
!       }
!       Assert(num_moved == checked_moved);
! 
        /*
         * It'd be cleaner to make this report at the bottom of this routine,
         * but then the rusage would double-count the second pass of index
--- 2230,2239 ----
         * Now scan all the pages that we moved tuples onto and update tuple
         * status bits.  This is not really necessary, but will save time for
         * future transactions examining these tuples.
         */
!       update_hint_bits(onerel, fraged_pages, num_fraged_pages,
!                                        last_move_dest_block, num_moved);
!   
        /*
         * It'd be cleaner to make this report at the bottom of this routine,
         * but then the rusage would double-count the second pass of index
***************
*** 2455,2460 ****
--- 2270,2282 ----
                                *vpleft = *vpright;
                                *vpright = vpsave;
                        }
+                       /*
+                        * keep_tuples is the number of tuples that have been moved
+                        * off a page during chain moves but not been scanned over
+                        * subsequently.  The tuple ids of these tuples are not
+                        * recorded as free offsets for any VacPage, so they will not
+                        * be cleared from the indexes.
+                        */
                        Assert(keep_tuples >= 0);
                        for (i = 0; i < nindexes; i++)
                                vacuum_index(&Nvacpagelist, Irel[i],
***************
*** 2465,2500 ****
                if (vacpage->blkno == (blkno - 1) &&
                        vacpage->offsets_free > 0)
                {
!                       OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
!                       int                     uncnt;
  
                        buf = ReadBuffer(onerel, vacpage->blkno);
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                        page = BufferGetPage(buf);
-                       num_tuples = 0;
                        maxoff = PageGetMaxOffsetNumber(page);
                        for (offnum = FirstOffsetNumber;
                                 offnum <= maxoff;
                                 offnum = OffsetNumberNext(offnum))
                        {
!                               itemid = PageGetItemId(page, offnum);
                                if (!ItemIdIsUsed(itemid))
                                        continue;
!                               tuple.t_datamcxt = NULL;
!                               tuple.t_data = (HeapTupleHeader) PageGetItem(page, 
itemid);
  
!                               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
!                               {
!                                       if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
!                                       {
!                                               if 
(HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
!                                                       elog(ERROR, "invalid XVAC in 
tuple header");
!                                               itemid->lp_flags &= ~LP_USED;
!                                               num_tuples++;
!                                       }
!                                       else
!                                               elog(ERROR, "HEAP_MOVED_OFF was 
expected");
!                               }
  
                        }
                        Assert(vacpage->offsets_free == num_tuples);
--- 2287,2327 ----
                if (vacpage->blkno == (blkno - 1) &&
                        vacpage->offsets_free > 0)
                {
!                       Buffer                  buf;
!                       Page                    page;
!                       OffsetNumber    unused[BLCKSZ / sizeof(OffsetNumber)];
!                       OffsetNumber    offnum,
!                                                       maxoff;
!                       int                             uncnt;
!                       int                             num_tuples = 0;
  
                        buf = ReadBuffer(onerel, vacpage->blkno);
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                        page = BufferGetPage(buf);
                        maxoff = PageGetMaxOffsetNumber(page);
                        for (offnum = FirstOffsetNumber;
                                 offnum <= maxoff;
                                 offnum = OffsetNumberNext(offnum))
                        {
!                               ItemId  itemid = PageGetItemId(page, offnum);
!                               HeapTupleHeader htup;
! 
                                if (!ItemIdIsUsed(itemid))
                                        continue;
!                               htup = (HeapTupleHeader) PageGetItem(page, itemid);
!                               if (htup->t_infomask & HEAP_XMIN_COMMITTED)
!                                       continue;
  
!                               /*
!                               ** See comments in the walk-along-page loop above, why 
we
!                               ** have Asserts here instead of if (...) elog(ERROR).
!                               */
!                               Assert(!(htup->t_infomask & HEAP_MOVED_IN));
!                               Assert(htup->t_infomask & HEAP_MOVED_OFF);
!                               Assert(HeapTupleHeaderGetXvac(htup) == myXID);
! 
!                               itemid->lp_flags &= ~LP_USED;
!                               num_tuples++;
  
                        }
                        Assert(vacpage->offsets_free == num_tuples);
***************
*** 2554,2564 ****
        if (vacrelstats->vtlinks != NULL)
                pfree(vacrelstats->vtlinks);
  
!       ExecDropTupleTable(tupleTable, true);
  
!       ExecCloseIndices(resultRelInfo);
  
!       FreeExecutorState(estate);
  }
  
  /*
--- 2381,2717 ----
        if (vacrelstats->vtlinks != NULL)
                pfree(vacrelstats->vtlinks);
  
!       ExecContext_Finish(&ec);
! }
! 
! /*
!  *    move_chain_tuple() -- move one tuple that is part of a tuple chain
!  *
!  *            This routine moves old_tup from old_page to dst_page.
!  *            old_page and dst_page might be the same page.
!  *            On entry old_buf and dst_buf are locked exclusively, both locks (or
!  *            the single lock, if this is a intra-page-move) are released before
!  *            exit.
!  *
!  *            Yes, a routine with ten parameters is ugly, but it's still better
!  *            than having these 120 lines of code in repair_frag() which is
!  *            already too long and almost unreadable.
!  */
! static void
! move_chain_tuple(Relation rel,
!                                Buffer old_buf, Page old_page, HeapTuple old_tup,
!                                Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
!                                ExecContext ec, ItemPointer ctid, bool cleanVpd)
! {
!       TransactionId myXID = GetCurrentTransactionId();
!       HeapTupleData   newtup;
!       OffsetNumber    newoff;
!       ItemId                  newitemid;
!       Size                    tuple_len = old_tup->t_len;
! 
!       heap_copytuple_with_tuple(old_tup, &newtup);
! 
!       /*
!        * register invalidation of source tuple in catcaches.
!        */
!       CacheInvalidateHeapTuple(rel, old_tup);
! 
!       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
!       START_CRIT_SECTION();
! 
!       old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                                 HEAP_XMIN_INVALID |
!                                                                 HEAP_MOVED_IN);
!       old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
!       HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
! 
!       /*
!        * If this page was not used before - clean it.
!        *
!        * NOTE: a nasty bug used to lurk here.  It is possible
!        * for the source and destination pages to be the same
!        * (since this tuple-chain member can be on a page
!        * lower than the one we're currently processing in
!        * the outer loop).  If that's true, then after
!        * vacuum_page() the source tuple will have been
!        * moved, and tuple.t_data will be pointing at
!        * garbage.  Therefore we must do everything that uses
!        * old_tup->t_data BEFORE this step!!
!        *
!        * This path is different from the other callers of
!        * vacuum_page, because we have already incremented
!        * the vacpage's offsets_used field to account for the
!        * tuple(s) we expect to move onto the page. Therefore
!        * vacuum_page's check for offsets_used == 0 is wrong.
!        * But since that's a good debugging check for all
!        * other callers, we work around it here rather than
!        * remove it.
!        */
!       if (!PageIsEmpty(dst_page) && cleanVpd)
!       {
!               int             sv_offsets_used = dst_vacpage->offsets_used;
! 
!               dst_vacpage->offsets_used = 0;
!               vacuum_page(rel, dst_buf, dst_vacpage);
!               dst_vacpage->offsets_used = sv_offsets_used;
!       }
  
!       /*
!        * Update the state of the copied tuple, and store it
!        * on the destination page.
!        */
!       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                                  HEAP_XMIN_INVALID |
!                                                                  HEAP_MOVED_OFF);
!       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
!       newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
!                                                InvalidOffsetNumber, LP_USED);
!       if (newoff == InvalidOffsetNumber)
!       {
!               elog(PANIC, "failed to add item with len = %lu to page %u while moving 
tuple chain",
!                 (unsigned long) tuple_len, dst_vacpage->blkno);
!       }
!       newitemid = PageGetItemId(dst_page, newoff);
!       pfree(newtup.t_data);
!       newtup.t_datamcxt = NULL;
!       newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
!       ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
! 
!       /* XLOG stuff */
!       if (!rel->rd_istemp)
!       {
!               XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
!                                                                                  
dst_buf, &newtup);
! 
!               if (old_buf != dst_buf)
!               {
!                       PageSetLSN(old_page, recptr);
!                       PageSetSUI(old_page, ThisStartUpID);
!               }
!               PageSetLSN(dst_page, recptr);
!               PageSetSUI(dst_page, ThisStartUpID);
!       }
!       else
!       {
!               /*
!                * No XLOG record, but still need to flag that XID
!                * exists on disk
!                */
!               MyXactMadeTempRelUpdate = true;
!       }
! 
!       END_CRIT_SECTION();
! 
!       /*
!        * Set new tuple's t_ctid pointing to itself for last
!        * tuple in chain, and to next tuple in chain
!        * otherwise.
!        */
!       /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
!       if (!ItemPointerIsValid(ctid))
!               newtup.t_data->t_ctid = newtup.t_self;
!       else
!               newtup.t_data->t_ctid = *ctid;
!       *ctid = newtup.t_self;
  
!       LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
!       if (dst_buf != old_buf)
!               LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
! 
!       /* Create index entries for the moved tuple */
!       if (ec->resultRelInfo->ri_NumIndices > 0)
!       {
!               ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
!               ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
!       }
! }
! 
! /*
!  *    move_plain_tuple() -- move one tuple that is not part of a chain
!  *
!  *            This routine moves old_tup from old_page to dst_page.
!  *            On entry old_buf and dst_buf are locked exclusively, both locks are
!  *            released before exit.
!  *
!  *            Yes, a routine with eight parameters is ugly, but it's still better
!  *            than having these 90 lines of code in repair_frag() which is already
!  *            too long and almost unreadable.
!  */
! static void
! move_plain_tuple(Relation rel,
!                                Buffer old_buf, Page old_page, HeapTuple old_tup,
!                                Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
!                                ExecContext ec)
! {
!       TransactionId myXID = GetCurrentTransactionId();
!       HeapTupleData   newtup;
!       OffsetNumber    newoff;
!       ItemId                  newitemid;
!       Size                    tuple_len = old_tup->t_len;
! 
!       /* copy tuple */
!       heap_copytuple_with_tuple(old_tup, &newtup);
! 
!       /*
!        * register invalidation of source tuple in catcaches.
!        *
!        * (Note: we do not need to register the copied tuple, because we
!        * are not changing the tuple contents and so there cannot be
!        * any need to flush negative catcache entries.)
!        */
!       CacheInvalidateHeapTuple(rel, old_tup);
! 
!       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
!       START_CRIT_SECTION();
! 
!       /*
!        * Mark new tuple as MOVED_IN by me.
!        */
!       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                                  HEAP_XMIN_INVALID |
!                                                                  HEAP_MOVED_OFF);
!       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
! 
!       /* add tuple to the page */
!       newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
!                                                InvalidOffsetNumber, LP_USED);
!       if (newoff == InvalidOffsetNumber)
!       {
!               elog(PANIC, "failed to add item with len = %lu to page %u (free space 
%lu, nusd %u, noff %u)",
!                        (unsigned long) tuple_len,
!                        dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
!                        dst_vacpage->offsets_used, dst_vacpage->offsets_free);
!       }
!       newitemid = PageGetItemId(dst_page, newoff);
!       pfree(newtup.t_data);
!       newtup.t_datamcxt = NULL;
!       newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
!       ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
!       newtup.t_self = newtup.t_data->t_ctid;
! 
!       /*
!        * Mark old tuple as MOVED_OFF by me.
!        */
!       old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                                 HEAP_XMIN_INVALID |
!                                                                 HEAP_MOVED_IN);
!       old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
!       HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
! 
!       /* XLOG stuff */
!       if (!rel->rd_istemp)
!       {
!               XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
!                                                                                  
dst_buf, &newtup);
! 
!               PageSetLSN(old_page, recptr);
!               PageSetSUI(old_page, ThisStartUpID);
!               PageSetLSN(dst_page, recptr);
!               PageSetSUI(dst_page, ThisStartUpID);
!       }
!       else
!       {
!               /*
!                * No XLOG record, but still need to flag that XID exists
!                * on disk
!                */
!               MyXactMadeTempRelUpdate = true;
!       }
! 
!       END_CRIT_SECTION();
! 
!       dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
!                                               ((PageHeader) dst_page)->pd_lower;
!       LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
!       LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
! 
!       dst_vacpage->offsets_used++;
! 
!       /* insert index' tuples if needed */
!       if (ec->resultRelInfo->ri_NumIndices > 0)
!       {
!               ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
!               ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
!       }
! }
! 
! /*
!  *    update_hint_bits() -- update hint bits in destination pages
!  *
!  * Scan all the pages that we moved tuples onto and update tuple
!  * status bits.  This is not really necessary, but will save time for
!  * future transactions examining these tuples.
!  *
!  * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
!  * pages that were move source pages but not move dest pages.  One
!  * also wonders whether it wouldn't be better to skip this step and
!  * let the tuple status updates happen someplace that's not holding an
!  * exclusive lock on the relation.
!  */
! static void
! update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
!                                BlockNumber last_move_dest_block, int num_moved)
! {
!       int                     checked_moved = 0;
!       int                     i;
!       VacPage    *curpage;
! 
!       for (i = 0, curpage = fraged_pages->pagedesc;
!                i < num_fraged_pages;
!                i++, curpage++)
!       {
!               Buffer                  buf;
!               Page                    page;
!               OffsetNumber    max_offset;
!               OffsetNumber    off;
!               int                             num_tuples = 0;
! 
!               vacuum_delay_point();
! 
!               if ((*curpage)->blkno > last_move_dest_block)
!                       break;                          /* no need to scan any further 
*/
!               if ((*curpage)->offsets_used == 0)
!                       continue;                       /* this page was never used as 
a move dest */
!               buf = ReadBuffer(rel, (*curpage)->blkno);
!               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
!               page = BufferGetPage(buf);
!               max_offset = PageGetMaxOffsetNumber(page);
!               for (off = FirstOffsetNumber;
!                        off <= max_offset;
!                        off = OffsetNumberNext(off))
!               {
!                       ItemId  itemid = PageGetItemId(page, off);
!                       HeapTupleHeader htup;
! 
!                       if (!ItemIdIsUsed(itemid))
!                               continue;
!                       htup = (HeapTupleHeader) PageGetItem(page, itemid);
!                       if (htup->t_infomask & HEAP_XMIN_COMMITTED)
!                               continue;
!                       /*
!                        * See comments in the walk-along-page loop above, why we
!                        * have Asserts here instead of if (...) elog(ERROR).  The
!                        * difference here is that we may see MOVED_IN.
!                        */
!                       Assert(htup->t_infomask & HEAP_MOVED);
!                       Assert(HeapTupleHeaderGetXvac(htup) == 
GetCurrentTransactionId());
!                       if (htup->t_infomask & HEAP_MOVED_IN)
!                       {
!                               htup->t_infomask |= HEAP_XMIN_COMMITTED;
!                               htup->t_infomask &= ~HEAP_MOVED;
!                               num_tuples++;
!                       }
!                       else
!                               htup->t_infomask |= HEAP_XMIN_INVALID;
!               }
!               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
!               WriteBuffer(buf);
!               Assert((*curpage)->offsets_used == num_tuples);
!               checked_moved += num_tuples;
!       }
!       Assert(num_moved == checked_moved);
  }
  
  /*
---------------------------(end of broadcast)---------------------------
TIP 6: Have you searched our list archives?

               http://archives.postgresql.org

Reply via email to