Patch applied.  Thanks.

---------------------------------------------------------------------------


Manfred Koizar wrote:
>    . rename variables
>      . cur_buffer -> dst_buffer
>      . ToPage -> dst_page
>      . cur_page -> dst_vacpage
>    . move variable declarations into block where variable is used
>    . various Asserts instead of elog(ERROR, ...)
>    . extract functionality from repair_frag() into new routines
>      . move_chain_tuple()
>      . move_plain_tuple()
>      . update_hint_bits()
>    . create type ExecContext
>    . add comments
> 
> This patch does not intend to change any behaviour.  It passes make
> check, make installcheck and some manual tests.  It might be hard to
> review, because some lines are affected by more than one change.  If
> it's too much to swallow at once, I can provide it in smaller chunks ...
> 
> Servus
>  Manfred

> diff -Ncr ../base/src/backend/commands/vacuum.c src/backend/commands/vacuum.c
> *** ../base/src/backend/commands/vacuum.c     Mon May 31 21:24:05 2004
> --- src/backend/commands/vacuum.c     Wed Jun  2 21:46:59 2004
> ***************
> *** 99,104 ****
> --- 99,162 ----
>       VTupleLink      vtlinks;
>   } VRelStats;
>   
> + /*----------------------------------------------------------------------
> +  * ExecContext:
> +  *
> +  * As these variables always appear together, we put them into one struct
> +  * and pull initialization and cleanup into separate routines.
> +  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
> +  * accurately:  It is *used* only in move_xxx_tuple(), but because this
> +  * routine is called many times, we initialize the struct just once in
> +  * repair_frag() and pass it on to move_xxx_tuple().
> +  */
> + typedef struct ExecContextData
> + {
> +     ResultRelInfo *resultRelInfo;
> +     EState     *estate;
> +     TupleTable      tupleTable;
> +     TupleTableSlot *slot;
> + } ExecContextData;
> + typedef ExecContextData *ExecContext;
> + 
> + static void
> + ExecContext_Init(ExecContext ec, Relation rel)
> + {
> +     TupleDesc       tupdesc = RelationGetDescr(rel);
> + 
> +     /*
> +      * We need a ResultRelInfo and an EState so we can use the regular
> +      * executor's index-entry-making machinery.
> +      */
> +     ec->estate = CreateExecutorState();
> + 
> +     ec->resultRelInfo = makeNode(ResultRelInfo);
> +     ec->resultRelInfo->ri_RangeTableIndex = 1;              /* dummy */
> +     ec->resultRelInfo->ri_RelationDesc = rel;
> +     ec->resultRelInfo->ri_TrigDesc = NULL;  /* we don't fire triggers */
> + 
> +     ExecOpenIndices(ec->resultRelInfo);
> + 
> +     ec->estate->es_result_relations = ec->resultRelInfo;
> +     ec->estate->es_num_result_relations = 1;
> +     ec->estate->es_result_relation_info = ec->resultRelInfo;
> + 
> +     /* Set up a dummy tuple table too */
> +     ec->tupleTable = ExecCreateTupleTable(1);
> +     ec->slot = ExecAllocTableSlot(ec->tupleTable);
> +     ExecSetSlotDescriptor(ec->slot, tupdesc, false);
> + }
> + 
> + static void
> + ExecContext_Finish(ExecContext ec)
> + {
> +     ExecDropTupleTable(ec->tupleTable, true);
> +     ExecCloseIndices(ec->resultRelInfo);
> +     FreeExecutorState(ec->estate);
> + }
> + /*
> +  * End of ExecContext Implementation
> +  *----------------------------------------------------------------------
> +  */
>   
>   static MemoryContext vac_context = NULL;
>   
> ***************
> *** 122,127 ****
> --- 180,196 ----
>   static void repair_frag(VRelStats *vacrelstats, Relation onerel,
>                       VacPageList vacuum_pages, VacPageList fraged_pages,
>                       int nindexes, Relation *Irel);
> + static void move_chain_tuple(Relation rel,
> +                                      Buffer old_buf, Page old_page, HeapTuple 
> old_tup,
> +                                      Buffer dst_buf, Page dst_page, VacPage 
> dst_vacpage,
> +                                      ExecContext ec, ItemPointer ctid, bool 
> cleanVpd);
> + static void move_plain_tuple(Relation rel,
> +                                      Buffer old_buf, Page old_page, HeapTuple 
> old_tup,
> +                                      Buffer dst_buf, Page dst_page, VacPage 
> dst_vacpage,
> +                                      ExecContext ec);
> + static void update_hint_bits(Relation rel, VacPageList fraged_pages,
> +                                     int num_fraged_pages, BlockNumber 
> last_move_dest_block,
> +                                     int num_moved);
>   static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
>                       VacPageList vacpagelist);
>   static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
> ***************
> *** 675,681 ****
>   static void
>   vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
>   {
> !     TransactionId myXID;
>       Relation        relation;
>       HeapScanDesc scan;
>       HeapTuple       tuple;
> --- 744,750 ----
>   static void
>   vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
>   {
> !     TransactionId myXID = GetCurrentTransactionId();
>       Relation        relation;
>       HeapScanDesc scan;
>       HeapTuple       tuple;
> ***************
> *** 683,689 ****
>       bool            vacuumAlreadyWrapped = false;
>       bool            frozenAlreadyWrapped = false;
>   
> -     myXID = GetCurrentTransactionId();
>   
>       relation = heap_openr(DatabaseRelationName, AccessShareLock);
>   
> --- 752,757 ----
> ***************
> *** 1059,1075 ****
>   {
>       BlockNumber nblocks,
>                               blkno;
> -     ItemId          itemid;
> -     Buffer          buf;
>       HeapTupleData tuple;
> -     OffsetNumber offnum,
> -                             maxoff;
> -     bool            pgchanged,
> -                             tupgone,
> -                             notup;
>       char       *relname;
> !     VacPage         vacpage,
> !                             vacpagecopy;
>       BlockNumber empty_pages,
>                               empty_end_pages;
>       double          num_tuples,
> --- 1127,1135 ----
>   {
>       BlockNumber nblocks,
>                               blkno;
>       HeapTupleData tuple;
>       char       *relname;
> !     VacPage         vacpage;
>       BlockNumber empty_pages,
>                               empty_end_pages;
>       double          num_tuples,
> ***************
> *** 1080,1086 ****
>                               usable_free_space;
>       Size            min_tlen = MaxTupleSize;
>       Size            max_tlen = 0;
> -     int                     i;
>       bool            do_shrinking = true;
>       VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
>       int                     num_vtlinks = 0;
> --- 1140,1145 ----
> ***************
> *** 1113,1118 ****
> --- 1172,1182 ----
>                                       tempPage = NULL;
>               bool            do_reap,
>                                       do_frag;
> +             Buffer          buf;
> +             OffsetNumber offnum,
> +                                     maxoff;
> +             bool            pgchanged,
> +                                     notup;
>   
>               vacuum_delay_point();
>   
> ***************
> *** 1125,1130 ****
> --- 1189,1196 ----
>   
>               if (PageIsNew(page))
>               {
> +                     VacPage vacpagecopy;
> + 
>                       ereport(WARNING,
>                       (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
>                                       relname, blkno)));
> ***************
> *** 1142,1147 ****
> --- 1208,1215 ----
>   
>               if (PageIsEmpty(page))
>               {
> +                     VacPage vacpagecopy;
> + 
>                       vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) 
> page)->pd_lower;
>                       free_space += vacpage->free;
>                       empty_pages++;
> ***************
> *** 1161,1168 ****
>                        offnum = OffsetNumberNext(offnum))
>               {
>                       uint16          sv_infomask;
> ! 
> !                     itemid = PageGetItemId(page, offnum);
>   
>                       /*
>                        * Collect un-used items too - it's possible to have indexes
> --- 1229,1236 ----
>                        offnum = OffsetNumberNext(offnum))
>               {
>                       uint16          sv_infomask;
> !                     ItemId          itemid = PageGetItemId(page, offnum);
> !                     bool            tupgone = false;
>   
>                       /*
>                        * Collect un-used items too - it's possible to have indexes
> ***************
> *** 1180,1186 ****
>                       tuple.t_len = ItemIdGetLength(itemid);
>                       ItemPointerSet(&(tuple.t_self), blkno, offnum);
>   
> -                     tupgone = false;
>                       sv_infomask = tuple.t_data->t_infomask;
>   
>                       switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
> --- 1248,1253 ----
> ***************
> *** 1269,1275 ****
>                                       do_shrinking = false;
>                                       break;
>                               default:
> !                                     elog(ERROR, "unexpected 
> HeapTupleSatisfiesVacuum result");
>                                       break;
>                       }
>   
> --- 1336,1343 ----
>                                       do_shrinking = false;
>                                       break;
>                               default:
> !                                     /* unexpected HeapTupleSatisfiesVacuum result 
> */
> !                                     Assert(false);
>                                       break;
>                       }
>   
> ***************
> *** 1344,1350 ****
>   
>               if (do_reap || do_frag)
>               {
> !                     vacpagecopy = copy_vac_page(vacpage);
>                       if (do_reap)
>                               vpage_insert(vacuum_pages, vacpagecopy);
>                       if (do_frag)
> --- 1412,1418 ----
>   
>               if (do_reap || do_frag)
>               {
> !                     VacPage vacpagecopy = copy_vac_page(vacpage);
>                       if (do_reap)
>                               vpage_insert(vacuum_pages, vacpagecopy);
>                       if (do_frag)
> ***************
> *** 1390,1395 ****
> --- 1458,1465 ----
>        */
>       if (do_shrinking)
>       {
> +             int                     i;
> + 
>               Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
>               fraged_pages->num_pages -= empty_end_pages;
>               usable_free_space = 0;
> ***************
> *** 1453,1528 ****
>                       VacPageList vacuum_pages, VacPageList fraged_pages,
>                       int nindexes, Relation *Irel)
>   {
> !     TransactionId myXID;
> !     CommandId       myCID;
> !     Buffer          buf,
> !                             cur_buffer;
>       BlockNumber nblocks,
>                               blkno;
>       BlockNumber last_move_dest_block = 0,
>                               last_vacuum_block;
> !     Page            page,
> !                             ToPage = NULL;
> !     OffsetNumber offnum,
> !                             maxoff,
> !                             newoff,
> !                             max_offset;
> !     ItemId          itemid,
> !                             newitemid;
> !     HeapTupleData tuple,
> !                             newtup;
> !     TupleDesc       tupdesc;
> !     ResultRelInfo *resultRelInfo;
> !     EState     *estate;
> !     TupleTable      tupleTable;
> !     TupleTableSlot *slot;
>       VacPageListData Nvacpagelist;
> !     VacPage         cur_page = NULL,
>                               last_vacuum_page,
>                               vacpage,
>                          *curpage;
> -     int                     cur_item = 0;
>       int                     i;
> !     Size            tuple_len;
> !     int                     num_moved,
>                               num_fraged_pages,
>                               vacuumed_pages;
> !     int                     checked_moved,
> !                             num_tuples,
> !                             keep_tuples = 0;
> !     bool            isempty,
> !                             dowrite,
> !                             chain_tuple_moved;
>       VacRUsage       ru0;
>   
>       vac_init_rusage(&ru0);
>   
> !     myXID = GetCurrentTransactionId();
> !     myCID = GetCurrentCommandId();
> ! 
> !     tupdesc = RelationGetDescr(onerel);
> ! 
> !     /*
> !      * We need a ResultRelInfo and an EState so we can use the regular
> !      * executor's index-entry-making machinery.
> !      */
> !     estate = CreateExecutorState();
> ! 
> !     resultRelInfo = makeNode(ResultRelInfo);
> !     resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
> !     resultRelInfo->ri_RelationDesc = onerel;
> !     resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
> ! 
> !     ExecOpenIndices(resultRelInfo);
> ! 
> !     estate->es_result_relations = resultRelInfo;
> !     estate->es_num_result_relations = 1;
> !     estate->es_result_relation_info = resultRelInfo;
> ! 
> !     /* Set up a dummy tuple table too */
> !     tupleTable = ExecCreateTupleTable(1);
> !     slot = ExecAllocTableSlot(tupleTable);
> !     ExecSetSlotDescriptor(slot, tupdesc, false);
>   
>       Nvacpagelist.num_pages = 0;
>       num_fraged_pages = fraged_pages->num_pages;
> --- 1523,1551 ----
>                       VacPageList vacuum_pages, VacPageList fraged_pages,
>                       int nindexes, Relation *Irel)
>   {
> !     TransactionId myXID = GetCurrentTransactionId();
> !     Buffer          dst_buffer = InvalidBuffer;
>       BlockNumber nblocks,
>                               blkno;
>       BlockNumber last_move_dest_block = 0,
>                               last_vacuum_block;
> !     Page            dst_page = NULL;
> !     ExecContextData ec;
>       VacPageListData Nvacpagelist;
> !     VacPage         dst_vacpage = NULL,
>                               last_vacuum_page,
>                               vacpage,
>                          *curpage;
>       int                     i;
> !     int                     num_moved = 0,
>                               num_fraged_pages,
>                               vacuumed_pages;
> !     int                     keep_tuples = 0;
>       VacRUsage       ru0;
>   
>       vac_init_rusage(&ru0);
>   
> !     ExecContext_Init(&ec, onerel);
>   
>       Nvacpagelist.num_pages = 0;
>       num_fraged_pages = fraged_pages->num_pages;
> ***************
> *** 1539,1546 ****
>               last_vacuum_page = NULL;
>               last_vacuum_block = InvalidBlockNumber;
>       }
> -     cur_buffer = InvalidBuffer;
> -     num_moved = 0;
>   
>       vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * 
> sizeof(OffsetNumber));
>       vacpage->offsets_used = vacpage->offsets_free = 0;
> --- 1562,1567 ----
> ***************
> *** 1560,1565 ****
> --- 1581,1594 ----
>                blkno > last_move_dest_block;
>                blkno--)
>       {
> +             Buffer                  buf;
> +             Page                    page;
> +             OffsetNumber    offnum,
> +                                             maxoff;
> +             bool                    isempty,
> +                                             dowrite,
> +                                             chain_tuple_moved;
> + 
>               vacuum_delay_point();
>   
>               /*
> ***************
> *** 1635,1641 ****
>                        offnum <= maxoff;
>                        offnum = OffsetNumberNext(offnum))
>               {
> !                     itemid = PageGetItemId(page, offnum);
>   
>                       if (!ItemIdIsUsed(itemid))
>                               continue;
> --- 1664,1672 ----
>                        offnum <= maxoff;
>                        offnum = OffsetNumberNext(offnum))
>               {
> !                     Size                    tuple_len;
> !                     HeapTupleData   tuple;
> !                     ItemId                  itemid = PageGetItemId(page, offnum);
>   
>                       if (!ItemIdIsUsed(itemid))
>                               continue;
> ***************
> *** 1645,1689 ****
>                       tuple_len = tuple.t_len = ItemIdGetLength(itemid);
>                       ItemPointerSet(&(tuple.t_self), blkno, offnum);
>   
>                       if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
>                       {
> !                             if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> !                                     elog(ERROR, "HEAP_MOVED_IN was not expected");
>   
>                               /*
>                                * If this (chain) tuple is moved by me already then I
>                                * have to check is it in vacpage or not - i.e. is it
>                                * moved while cleaning this page or some previous one.
>                                */
> !                             if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> !                             {
> !                                     if (HeapTupleHeaderGetXvac(tuple.t_data) != 
> myXID)
> !                                             elog(ERROR, "invalid XVAC in tuple 
> header");
> !                                     if (keep_tuples == 0)
> !                                             continue;
> !                                     if (chain_tuple_moved)          /* some chains 
> was moved
> !                                                                                    
>           * while */
> !                                     {                       /* cleaning this page 
> */
> !                                             Assert(vacpage->offsets_free > 0);
> !                                             for (i = 0; i < vacpage->offsets_free; 
> i++)
> !                                             {
> !                                                     if (vacpage->offsets[i] == 
> offnum)
> !                                                             break;
> !                                             }
> !                                             if (i >= vacpage->offsets_free) /* not 
> found */
> !                                             {
> !                                                     
> vacpage->offsets[vacpage->offsets_free++] = offnum;
> !                                                     keep_tuples--;
> !                                             }
>                                       }
> !                                     else
>                                       {
>                                               
> vacpage->offsets[vacpage->offsets_free++] = offnum;
>                                               keep_tuples--;
>                                       }
> -                                     continue;
>                               }
> !                             elog(ERROR, "HEAP_MOVED_OFF was expected");
>                       }
>   
>                       /*
> --- 1676,1746 ----
>                       tuple_len = tuple.t_len = ItemIdGetLength(itemid);
>                       ItemPointerSet(&(tuple.t_self), blkno, offnum);
>   
> +                     /*
> +                      * VACUUM FULL has an exclusive lock on the relation.  So
> +                      * normally no other transaction can have pending INSERTs or
> +                      * DELETEs in this relation.  A tuple is either
> +                      *   (a) a tuple in a system catalog, inserted or deleted by
> +                      *       a not yet committed transaction or
> +                      *   (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
> +                      *   (c) inserted by a committed xact (XMIN_COMMITTED) or
> +                      *   (d) moved by the currently running VACUUM.
> +                      * In case (a) we wouldn't be in repair_frag() at all.
> +                      * In case (b) we cannot be here, because scan_heap() has
> +                      * already marked the item as unused, see continue above.
> +                      * Case (c) is what normally is to be expected.
> +                      * Case (d) is only possible, if a whole tuple chain has been
> +                      * moved while processing this or a higher numbered block.
> +                      */
>                       if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
>                       {
> !                             /*
> !                              * There cannot be another concurrently running 
> VACUUM.  If
> !                              * the tuple had been moved in by a previous VACUUM, 
> the
> !                              * visibility check would have set XMIN_COMMITTED.  If 
> the
> !                              * tuple had been moved in by the currently running 
> VACUUM,
> !                              * the loop would have been terminated.  We had
> !                              * elog(ERROR, ...) here, but as we are testing for a
> !                              * can't-happen condition, Assert() seems more 
> appropriate.
> !                              */
> !                             Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
>   
>                               /*
>                                * If this (chain) tuple is moved by me already then I
>                                * have to check is it in vacpage or not - i.e. is it
>                                * moved while cleaning this page or some previous one.
>                                */
> !                             Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
> !                             /*
> !                              * MOVED_OFF by another VACUUM would have caused the
> !                              * visibility check to set XMIN_COMMITTED or 
> XMIN_INVALID.
> !                              */
> !                             Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
> ! 
> !                             /* Can't we Assert(keep_tuples > 0) here? */
> !                             if (keep_tuples == 0)
> !                                     continue;
> !                             if (chain_tuple_moved)          /* some chains was 
> moved
> !                                                                                    
>   * while */
> !                             {                       /* cleaning this page */
> !                                     Assert(vacpage->offsets_free > 0);
> !                                     for (i = 0; i < vacpage->offsets_free; i++)
> !                                     {
> !                                             if (vacpage->offsets[i] == offnum)
> !                                                     break;
>                                       }
> !                                     if (i >= vacpage->offsets_free) /* not found */
>                                       {
>                                               
> vacpage->offsets[vacpage->offsets_free++] = offnum;
>                                               keep_tuples--;
>                                       }
>                               }
> !                             else
> !                             {
> !                                     vacpage->offsets[vacpage->offsets_free++] = 
> offnum;
> !                                     keep_tuples--;
> !                             }
> !                             continue;
>                       }
>   
>                       /*
> ***************
> *** 1716,1723 ****
>                               Buffer          Cbuf = buf;
>                               bool            freeCbuf = false;
>                               bool            chain_move_failed = false;
> -                             Page            Cpage;
> -                             ItemId          Citemid;
>                               ItemPointerData Ctid;
>                               HeapTupleData tp = tuple;
>                               Size            tlen = tuple_len;
> --- 1773,1778 ----
> ***************
> *** 1728,1737 ****
>                               int                     to_item = 0;
>                               int                     ti;
>   
> !                             if (cur_buffer != InvalidBuffer)
>                               {
> !                                     WriteBuffer(cur_buffer);
> !                                     cur_buffer = InvalidBuffer;
>                               }
>   
>                               /* Quick exit if we have no vtlinks to search in */
> --- 1783,1792 ----
>                               int                     to_item = 0;
>                               int                     ti;
>   
> !                             if (dst_buffer != InvalidBuffer)
>                               {
> !                                     WriteBuffer(dst_buffer);
> !                                     dst_buffer = InvalidBuffer;
>                               }
>   
>                               /* Quick exit if we have no vtlinks to search in */
> ***************
> *** 1754,1759 ****
> --- 1809,1818 ----
>                                          !(ItemPointerEquals(&(tp.t_self),
>                                                                                  
> &(tp.t_data->t_ctid))))
>                               {
> +                                     Page            Cpage;
> +                                     ItemId          Citemid;
> +                                     ItemPointerData Ctid;
> + 
>                                       Ctid = tp.t_data->t_ctid;
>                                       if (freeCbuf)
>                                               ReleaseBuffer(Cbuf);
> ***************
> *** 1929,1940 ****
>                               }
>   
>                               /*
> !                              * Okay, move the whle tuple chain
>                                */
>                               ItemPointerSetInvalid(&Ctid);
>                               for (ti = 0; ti < num_vtmove; ti++)
>                               {
>                                       VacPage         destvacpage = 
> vtmove[ti].vacpage;
>   
>                                       /* Get page to move from */
>                                       tuple.t_self = vtmove[ti].tid;
> --- 1988,2001 ----
>                               }
>   
>                               /*
> !                              * Okay, move the whole tuple chain
>                                */
>                               ItemPointerSetInvalid(&Ctid);
>                               for (ti = 0; ti < num_vtmove; ti++)
>                               {
>                                       VacPage         destvacpage = 
> vtmove[ti].vacpage;
> +                                     Page            Cpage;
> +                                     ItemId          Citemid;
>   
>                                       /* Get page to move from */
>                                       tuple.t_self = vtmove[ti].tid;
> ***************
> *** 1942,1954 ****
>                                                        
> ItemPointerGetBlockNumber(&(tuple.t_self)));
>   
>                                       /* Get page to move to */
> !                                     cur_buffer = ReadBuffer(onerel, 
> destvacpage->blkno);
>   
> !                                     LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                                     if (cur_buffer != Cbuf)
>                                               LockBuffer(Cbuf, 
> BUFFER_LOCK_EXCLUSIVE);
>   
> !                                     ToPage = BufferGetPage(cur_buffer);
>                                       Cpage = BufferGetPage(Cbuf);
>   
>                                       Citemid = PageGetItemId(Cpage,
> --- 2003,2015 ----
>                                                        
> ItemPointerGetBlockNumber(&(tuple.t_self)));
>   
>                                       /* Get page to move to */
> !                                     dst_buffer = ReadBuffer(onerel, 
> destvacpage->blkno);
>   
> !                                     LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                                     if (dst_buffer != Cbuf)
>                                               LockBuffer(Cbuf, 
> BUFFER_LOCK_EXCLUSIVE);
>   
> !                                     dst_page = BufferGetPage(dst_buffer);
>                                       Cpage = BufferGetPage(Cbuf);
>   
>                                       Citemid = PageGetItemId(Cpage,
> ***************
> *** 1961,2081 ****
>                                        * make a copy of the source tuple, and then 
> mark the
>                                        * source tuple MOVED_OFF.
>                                        */
> !                                     heap_copytuple_with_tuple(&tuple, &newtup);
> ! 
> !                                     /*
> !                                      * register invalidation of source tuple in 
> catcaches.
> !                                      */
> !                                     CacheInvalidateHeapTuple(onerel, &tuple);
> ! 
> !                                     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> !                                     START_CRIT_SECTION();
> ! 
> !                                     tuple.t_data->t_infomask &= 
> ~(HEAP_XMIN_COMMITTED |
> !                                                                                    
>            HEAP_XMIN_INVALID |
> !                                                                                    
>            HEAP_MOVED_IN);
> !                                     tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
> !                                     HeapTupleHeaderSetXvac(tuple.t_data, myXID);
> ! 
> !                                     /*
> !                                      * If this page was not used before - clean it.
> !                                      *
> !                                      * NOTE: a nasty bug used to lurk here.  It is 
> possible
> !                                      * for the source and destination pages to be 
> the same
> !                                      * (since this tuple-chain member can be on a 
> page
> !                                      * lower than the one we're currently 
> processing in
> !                                      * the outer loop).  If that's true, then after
> !                                      * vacuum_page() the source tuple will have 
> been
> !                                      * moved, and tuple.t_data will be pointing at
> !                                      * garbage.  Therefore we must do everything 
> that uses
> !                                      * tuple.t_data BEFORE this step!!
> !                                      *
> !                                      * This path is different from the other 
> callers of
> !                                      * vacuum_page, because we have already 
> incremented
> !                                      * the vacpage's offsets_used field to account 
> for the
> !                                      * tuple(s) we expect to move onto the page. 
> Therefore
> !                                      * vacuum_page's check for offsets_used == 0 
> is wrong.
> !                                      * But since that's a good debugging check for 
> all
> !                                      * other callers, we work around it here 
> rather than
> !                                      * remove it.
> !                                      */
> !                                     if (!PageIsEmpty(ToPage) && 
> vtmove[ti].cleanVpd)
> !                                     {
> !                                             int                     
> sv_offsets_used = destvacpage->offsets_used;
> ! 
> !                                             destvacpage->offsets_used = 0;
> !                                             vacuum_page(onerel, cur_buffer, 
> destvacpage);
> !                                             destvacpage->offsets_used = 
> sv_offsets_used;
> !                                     }
> ! 
> !                                     /*
> !                                      * Update the state of the copied tuple, and 
> store it
> !                                      * on the destination page.
> !                                      */
> !                                     newtup.t_data->t_infomask &= 
> ~(HEAP_XMIN_COMMITTED |
> !                                                                                    
>             HEAP_XMIN_INVALID |
> !                                                                                    
>             HEAP_MOVED_OFF);
> !                                     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !                                     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !                                     newoff = PageAddItem(ToPage,
> !                                                                              
> (Item) newtup.t_data,
> !                                                                              
> tuple_len,
> !                                                                              
> InvalidOffsetNumber,
> !                                                                              
> LP_USED);
> !                                     if (newoff == InvalidOffsetNumber)
> !                                     {
> !                                             elog(PANIC, "failed to add item with 
> len = %lu to page %u while moving tuple chain",
> !                                               (unsigned long) tuple_len, 
> destvacpage->blkno);
> !                                     }
> !                                     newitemid = PageGetItemId(ToPage, newoff);
> !                                     pfree(newtup.t_data);
> !                                     newtup.t_datamcxt = NULL;
> !                                     newtup.t_data = (HeapTupleHeader) 
> PageGetItem(ToPage, newitemid);
> !                                     ItemPointerSet(&(newtup.t_self), 
> destvacpage->blkno, newoff);
> ! 
> !                                     /* XLOG stuff */
> !                                     if (!onerel->rd_istemp)
> !                                     {
> !                                             XLogRecPtr      recptr =
> !                                             log_heap_move(onerel, Cbuf, 
> tuple.t_self,
> !                                                                       cur_buffer, 
> &newtup);
> ! 
> !                                             if (Cbuf != cur_buffer)
> !                                             {
> !                                                     PageSetLSN(Cpage, recptr);
> !                                                     PageSetSUI(Cpage, 
> ThisStartUpID);
> !                                             }
> !                                             PageSetLSN(ToPage, recptr);
> !                                             PageSetSUI(ToPage, ThisStartUpID);
> !                                     }
> !                                     else
> !                                     {
> !                                             /*
> !                                              * No XLOG record, but still need to 
> flag that XID
> !                                              * exists on disk
> !                                              */
> !                                             MyXactMadeTempRelUpdate = true;
> !                                     }
> ! 
> !                                     END_CRIT_SECTION();
>   
>                                       if (destvacpage->blkno > last_move_dest_block)
>                                               last_move_dest_block = 
> destvacpage->blkno;
>   
>                                       /*
> -                                      * Set new tuple's t_ctid pointing to itself 
> for last
> -                                      * tuple in chain, and to next tuple in chain
> -                                      * otherwise.
> -                                      */
> -                                     if (!ItemPointerIsValid(&Ctid))
> -                                             newtup.t_data->t_ctid = newtup.t_self;
> -                                     else
> -                                             newtup.t_data->t_ctid = Ctid;
> -                                     Ctid = newtup.t_self;
> - 
> -                                     num_moved++;
> - 
> -                                     /*
>                                        * Remember that we moved tuple from the 
> current page
>                                        * (corresponding index tuple will be cleaned).
>                                        */
> --- 2022,2036 ----
>                                        * make a copy of the source tuple, and then 
> mark the
>                                        * source tuple MOVED_OFF.
>                                        */
> !                                     move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
> !                                                                      dst_buffer, 
> dst_page, destvacpage,
> !                                                                      &ec, &Ctid, 
> vtmove[ti].cleanVpd);
>   
> +                                     num_moved++;
>                                       if (destvacpage->blkno > last_move_dest_block)
>                                               last_move_dest_block = 
> destvacpage->blkno;
>   
>                                       /*
>                                        * Remember that we moved tuple from the 
> current page
>                                        * (corresponding index tuple will be cleaned).
>                                        */
> ***************
> *** 2085,2107 ****
>                                       else
>                                               keep_tuples++;
>   
> !                                     LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
> !                                     if (cur_buffer != Cbuf)
> !                                             LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
> ! 
> !                                     /* Create index entries for the moved tuple */
> !                                     if (resultRelInfo->ri_NumIndices > 0)
> !                                     {
> !                                             ExecStoreTuple(&newtup, slot, 
> InvalidBuffer, false);
> !                                             ExecInsertIndexTuples(slot, 
> &(newtup.t_self),
> !                                                                                    
>    estate, true);
> !                                     }
> ! 
> !                                     WriteBuffer(cur_buffer);
>                                       WriteBuffer(Cbuf);
>                               }                               /* end of 
> move-the-tuple-chain loop */
>   
> !                             cur_buffer = InvalidBuffer;
>                               pfree(vtmove);
>                               chain_tuple_moved = true;
>   
> --- 2040,2050 ----
>                                       else
>                                               keep_tuples++;
>   
> !                                     WriteBuffer(dst_buffer);
>                                       WriteBuffer(Cbuf);
>                               }                               /* end of 
> move-the-tuple-chain loop */
>   
> !                             dst_buffer = InvalidBuffer;
>                               pfree(vtmove);
>                               chain_tuple_moved = true;
>   
> ***************
> *** 2110,2122 ****
>                       }                                       /* end of 
> is-tuple-in-chain test */
>   
>                       /* try to find new page for this tuple */
> !                     if (cur_buffer == InvalidBuffer ||
> !                             !enough_space(cur_page, tuple_len))
>                       {
> !                             if (cur_buffer != InvalidBuffer)
>                               {
> !                                     WriteBuffer(cur_buffer);
> !                                     cur_buffer = InvalidBuffer;
>                               }
>                               for (i = 0; i < num_fraged_pages; i++)
>                               {
> --- 2053,2065 ----
>                       }                                       /* end of 
> is-tuple-in-chain test */
>   
>                       /* try to find new page for this tuple */
> !                     if (dst_buffer == InvalidBuffer ||
> !                             !enough_space(dst_vacpage, tuple_len))
>                       {
> !                             if (dst_buffer != InvalidBuffer)
>                               {
> !                                     WriteBuffer(dst_buffer);
> !                                     dst_buffer = InvalidBuffer;
>                               }
>                               for (i = 0; i < num_fraged_pages; i++)
>                               {
> ***************
> *** 2125,2234 ****
>                               }
>                               if (i == num_fraged_pages)
>                                       break;          /* can't move item anywhere */
> !                             cur_item = i;
> !                             cur_page = fraged_pages->pagedesc[cur_item];
> !                             cur_buffer = ReadBuffer(onerel, cur_page->blkno);
> !                             LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                             ToPage = BufferGetPage(cur_buffer);
>                               /* if this page was not used before - clean it */
> !                             if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 
> 0)
> !                                     vacuum_page(onerel, cur_buffer, cur_page);
>                       }
>                       else
> !                             LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
>   
>                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>   
> !                     /* copy tuple */
> !                     heap_copytuple_with_tuple(&tuple, &newtup);
> ! 
> !                     /*
> !                      * register invalidation of source tuple in catcaches.
> !                      *
> !                      * (Note: we do not need to register the copied tuple, because 
> we
> !                      * are not changing the tuple contents and so there cannot be
> !                      * any need to flush negative catcache entries.)
> !                      */
> !                     CacheInvalidateHeapTuple(onerel, &tuple);
>   
> -                     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> -                     START_CRIT_SECTION();
>   
> !                     /*
> !                      * Mark new tuple as MOVED_IN by me.
> !                      */
> !                     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                                                
> HEAP_XMIN_INVALID |
> !                                                                                
> HEAP_MOVED_OFF);
> !                     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !                     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> ! 
> !                     /* add tuple to the page */
> !                     newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
> !                                                              InvalidOffsetNumber, 
> LP_USED);
> !                     if (newoff == InvalidOffsetNumber)
> !                     {
> !                             elog(PANIC, "failed to add item with len = %lu to page 
> %u (free space %lu, nusd %u, noff %u)",
> !                                      (unsigned long) tuple_len,
> !                                      cur_page->blkno, (unsigned long) 
> cur_page->free,
> !                                      cur_page->offsets_used, 
> cur_page->offsets_free);
> !                     }
> !                     newitemid = PageGetItemId(ToPage, newoff);
> !                     pfree(newtup.t_data);
> !                     newtup.t_datamcxt = NULL;
> !                     newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, 
> newitemid);
> !                     ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, 
> newoff);
> !                     newtup.t_self = newtup.t_data->t_ctid;
>   
>                       /*
> !                      * Mark old tuple as MOVED_OFF by me.
>                        */
> -                     tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> -                                                                               
> HEAP_XMIN_INVALID |
> -                                                                               
> HEAP_MOVED_IN);
> -                     tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
> -                     HeapTupleHeaderSetXvac(tuple.t_data, myXID);
> - 
> -                     /* XLOG stuff */
> -                     if (!onerel->rd_istemp)
> -                     {
> -                             XLogRecPtr      recptr =
> -                             log_heap_move(onerel, buf, tuple.t_self,
> -                                                       cur_buffer, &newtup);
> - 
> -                             PageSetLSN(page, recptr);
> -                             PageSetSUI(page, ThisStartUpID);
> -                             PageSetLSN(ToPage, recptr);
> -                             PageSetSUI(ToPage, ThisStartUpID);
> -                     }
> -                     else
> -                     {
> -                             /*
> -                              * No XLOG record, but still need to flag that XID 
> exists
> -                              * on disk
> -                              */
> -                             MyXactMadeTempRelUpdate = true;
> -                     }
> - 
> -                     END_CRIT_SECTION();
> - 
> -                     cur_page->offsets_used++;
> -                     num_moved++;
> -                     cur_page->free = ((PageHeader) ToPage)->pd_upper - 
> ((PageHeader) ToPage)->pd_lower;
> -                     if (cur_page->blkno > last_move_dest_block)
> -                             last_move_dest_block = cur_page->blkno;
> - 
>                       vacpage->offsets[vacpage->offsets_free++] = offnum;
> - 
> -                     LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
> -                     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> - 
> -                     /* insert index' tuples if needed */
> -                     if (resultRelInfo->ri_NumIndices > 0)
> -                     {
> -                             ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
> -                             ExecInsertIndexTuples(slot, &(newtup.t_self), estate, 
> true);
> -                     }
>               }                                               /* walk along page */
>   
>               /*
> --- 2068,2099 ----
>                               }
>                               if (i == num_fraged_pages)
>                                       break;          /* can't move item anywhere */
> !                             dst_vacpage = fraged_pages->pagedesc[i];
> !                             dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
> !                             LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                             dst_page = BufferGetPage(dst_buffer);
>                               /* if this page was not used before - clean it */
> !                             if (!PageIsEmpty(dst_page) && 
> dst_vacpage->offsets_used == 0)
> !                                     vacuum_page(onerel, dst_buffer, dst_vacpage);
>                       }
>                       else
> !                             LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
>   
>                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>   
> !                     move_plain_tuple(onerel, buf, page, &tuple,
> !                                                      dst_buffer, dst_page, 
> dst_vacpage, &ec);
>   
>   
> !                     num_moved++;
> !                     if (dst_vacpage->blkno > last_move_dest_block)
> !                             last_move_dest_block = dst_vacpage->blkno;
>   
>                       /*
> !                      * Remember that we moved tuple from the current page
> !                      * (corresponding index tuple will be cleaned).
>                        */
>                       vacpage->offsets[vacpage->offsets_free++] = offnum;
>               }                                               /* walk along page */
>   
>               /*
> ***************
> *** 2249,2284 ****
>                                off <= maxoff;
>                                off = OffsetNumberNext(off))
>                       {
> !                             itemid = PageGetItemId(page, off);
>                               if (!ItemIdIsUsed(itemid))
>                                       continue;
> !                             tuple.t_datamcxt = NULL;
> !                             tuple.t_data = (HeapTupleHeader) PageGetItem(page, 
> itemid);
> !                             if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
>                                       continue;
> !                             if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> !                                     elog(ERROR, "HEAP_MOVED_IN was not expected");
> !                             if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
>                               {
> !                                     if (HeapTupleHeaderGetXvac(tuple.t_data) != 
> myXID)
> !                                             elog(ERROR, "invalid XVAC in tuple 
> header");
> !                                     /* some chains was moved while */
> !                                     if (chain_tuple_moved)
> !                                     {                       /* cleaning this page 
> */
> !                                             Assert(vacpage->offsets_free > 0);
> !                                             for (i = 0; i < vacpage->offsets_free; 
> i++)
> !                                             {
> !                                                     if (vacpage->offsets[i] == off)
> !                                                             break;
> !                                             }
> !                                             if (i >= vacpage->offsets_free) /* not 
> found */
> !                                             {
> !                                                     
> vacpage->offsets[vacpage->offsets_free++] = off;
> !                                                     Assert(keep_tuples > 0);
> !                                                     keep_tuples--;
> !                                             }
>                                       }
> !                                     else
>                                       {
>                                               
> vacpage->offsets[vacpage->offsets_free++] = off;
>                                               Assert(keep_tuples > 0);
> --- 2114,2144 ----
>                                off <= maxoff;
>                                off = OffsetNumberNext(off))
>                       {
> !                             ItemId  itemid = PageGetItemId(page, off);
> !                             HeapTupleHeader htup;
> ! 
>                               if (!ItemIdIsUsed(itemid))
>                                       continue;
> !                             htup = (HeapTupleHeader) PageGetItem(page, itemid);
> !                             if (htup->t_infomask & HEAP_XMIN_COMMITTED)
>                                       continue;
> !                             /*
> !                             ** See comments in the walk-along-page loop above, why 
> we
> !                             ** have Asserts here instead of if (...) elog(ERROR).
> !                             */
> !                             Assert(!(htup->t_infomask & HEAP_MOVED_IN));
> !                             Assert(htup->t_infomask & HEAP_MOVED_OFF);
> !                             Assert(HeapTupleHeaderGetXvac(htup) == myXID);
> !                             if (chain_tuple_moved)
>                               {
> !                                     /* some chains was moved while cleaning this 
> page */
> !                                     Assert(vacpage->offsets_free > 0);
> !                                     for (i = 0; i < vacpage->offsets_free; i++)
> !                                     {
> !                                             if (vacpage->offsets[i] == off)
> !                                                     break;
>                                       }
> !                                     if (i >= vacpage->offsets_free) /* not found */
>                                       {
>                                               
> vacpage->offsets[vacpage->offsets_free++] = off;
>                                               Assert(keep_tuples > 0);
> ***************
> *** 2286,2292 ****
>                                       }
>                               }
>                               else
> !                                     elog(ERROR, "HEAP_MOVED_OFF was expected");
>                       }
>               }
>   
> --- 2146,2156 ----
>                                       }
>                               }
>                               else
> !                             {
> !                                     vacpage->offsets[vacpage->offsets_free++] = 
> off;
> !                                     Assert(keep_tuples > 0);
> !                                     keep_tuples--;
> !                             }
>                       }
>               }
>   
> ***************
> *** 2312,2321 ****
>   
>       blkno++;                                        /* new number of blocks */
>   
> !     if (cur_buffer != InvalidBuffer)
>       {
>               Assert(num_moved > 0);
> !             WriteBuffer(cur_buffer);
>       }
>   
>       if (num_moved > 0)
> --- 2176,2185 ----
>   
>       blkno++;                                        /* new number of blocks */
>   
> !     if (dst_buffer != InvalidBuffer)
>       {
>               Assert(num_moved > 0);
> !             WriteBuffer(dst_buffer);
>       }
>   
>       if (num_moved > 0)
> ***************
> *** 2348,2353 ****
> --- 2212,2220 ----
>               Assert((*curpage)->blkno < blkno);
>               if ((*curpage)->offsets_used == 0)
>               {
> +                     Buffer          buf;
> +                     Page            page;
> + 
>                       /* this page was not used as a move target, so must clean it */
>                       buf = ReadBuffer(onerel, (*curpage)->blkno);
>                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> ***************
> *** 2363,2424 ****
>        * Now scan all the pages that we moved tuples onto and update tuple
>        * status bits.  This is not really necessary, but will save time for
>        * future transactions examining these tuples.
> -      *
> -      * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
> -      * pages that were move source pages but not move dest pages.  One
> -      * also wonders whether it wouldn't be better to skip this step and
> -      * let the tuple status updates happen someplace that's not holding an
> -      * exclusive lock on the relation.
>        */
> !     checked_moved = 0;
> !     for (i = 0, curpage = fraged_pages->pagedesc;
> !              i < num_fraged_pages;
> !              i++, curpage++)
> !     {
> !             vacuum_delay_point();
> ! 
> !             Assert((*curpage)->blkno < blkno);
> !             if ((*curpage)->blkno > last_move_dest_block)
> !                     break;                          /* no need to scan any further 
> */
> !             if ((*curpage)->offsets_used == 0)
> !                     continue;                       /* this page was never used as 
> a move dest */
> !             buf = ReadBuffer(onerel, (*curpage)->blkno);
> !             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> !             page = BufferGetPage(buf);
> !             num_tuples = 0;
> !             max_offset = PageGetMaxOffsetNumber(page);
> !             for (newoff = FirstOffsetNumber;
> !                      newoff <= max_offset;
> !                      newoff = OffsetNumberNext(newoff))
> !             {
> !                     itemid = PageGetItemId(page, newoff);
> !                     if (!ItemIdIsUsed(itemid))
> !                             continue;
> !                     tuple.t_datamcxt = NULL;
> !                     tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
> !                     if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> !                     {
> !                             if (!(tuple.t_data->t_infomask & HEAP_MOVED))
> !                                     elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was 
> expected");
> !                             if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> !                                     elog(ERROR, "invalid XVAC in tuple header");
> !                             if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> !                             {
> !                                     tuple.t_data->t_infomask |= 
> HEAP_XMIN_COMMITTED;
> !                                     tuple.t_data->t_infomask &= ~HEAP_MOVED;
> !                                     num_tuples++;
> !                             }
> !                             else
> !                                     tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
> !                     }
> !             }
> !             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> !             WriteBuffer(buf);
> !             Assert((*curpage)->offsets_used == num_tuples);
> !             checked_moved += num_tuples;
> !     }
> !     Assert(num_moved == checked_moved);
> ! 
>       /*
>        * It'd be cleaner to make this report at the bottom of this routine,
>        * but then the rusage would double-count the second pass of index
> --- 2230,2239 ----
>        * Now scan all the pages that we moved tuples onto and update tuple
>        * status bits.  This is not really necessary, but will save time for
>        * future transactions examining these tuples.
>        */
> !     update_hint_bits(onerel, fraged_pages, num_fraged_pages,
> !                                      last_move_dest_block, num_moved);
> !   
>       /*
>        * It'd be cleaner to make this report at the bottom of this routine,
>        * but then the rusage would double-count the second pass of index
> ***************
> *** 2455,2460 ****
> --- 2270,2282 ----
>                               *vpleft = *vpright;
>                               *vpright = vpsave;
>                       }
> +                     /*
> +                      * keep_tuples is the number of tuples that have been moved
> +                      * off a page during chain moves but not been scanned over
> +                      * subsequently.  The tuple ids of these tuples are not
> +                      * recorded as free offsets for any VacPage, so they will not
> +                      * be cleared from the indexes.
> +                      */
>                       Assert(keep_tuples >= 0);
>                       for (i = 0; i < nindexes; i++)
>                               vacuum_index(&Nvacpagelist, Irel[i],
> ***************
> *** 2465,2500 ****
>               if (vacpage->blkno == (blkno - 1) &&
>                       vacpage->offsets_free > 0)
>               {
> !                     OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
> !                     int                     uncnt;
>   
>                       buf = ReadBuffer(onerel, vacpage->blkno);
>                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>                       page = BufferGetPage(buf);
> -                     num_tuples = 0;
>                       maxoff = PageGetMaxOffsetNumber(page);
>                       for (offnum = FirstOffsetNumber;
>                                offnum <= maxoff;
>                                offnum = OffsetNumberNext(offnum))
>                       {
> !                             itemid = PageGetItemId(page, offnum);
>                               if (!ItemIdIsUsed(itemid))
>                                       continue;
> !                             tuple.t_datamcxt = NULL;
> !                             tuple.t_data = (HeapTupleHeader) PageGetItem(page, 
> itemid);
>   
> !                             if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> !                             {
> !                                     if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> !                                     {
> !                                             if 
> (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> !                                                     elog(ERROR, "invalid XVAC in 
> tuple header");
> !                                             itemid->lp_flags &= ~LP_USED;
> !                                             num_tuples++;
> !                                     }
> !                                     else
> !                                             elog(ERROR, "HEAP_MOVED_OFF was 
> expected");
> !                             }
>   
>                       }
>                       Assert(vacpage->offsets_free == num_tuples);
> --- 2287,2327 ----
>               if (vacpage->blkno == (blkno - 1) &&
>                       vacpage->offsets_free > 0)
>               {
> !                     Buffer                  buf;
> !                     Page                    page;
> !                     OffsetNumber    unused[BLCKSZ / sizeof(OffsetNumber)];
> !                     OffsetNumber    offnum,
> !                                                     maxoff;
> !                     int                             uncnt;
> !                     int                             num_tuples = 0;
>   
>                       buf = ReadBuffer(onerel, vacpage->blkno);
>                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>                       page = BufferGetPage(buf);
>                       maxoff = PageGetMaxOffsetNumber(page);
>                       for (offnum = FirstOffsetNumber;
>                                offnum <= maxoff;
>                                offnum = OffsetNumberNext(offnum))
>                       {
> !                             ItemId  itemid = PageGetItemId(page, offnum);
> !                             HeapTupleHeader htup;
> ! 
>                               if (!ItemIdIsUsed(itemid))
>                                       continue;
> !                             htup = (HeapTupleHeader) PageGetItem(page, itemid);
> !                             if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> !                                     continue;
>   
> !                             /*
> !                             ** See comments in the walk-along-page loop above, why 
> we
> !                             ** have Asserts here instead of if (...) elog(ERROR).
> !                             */
> !                             Assert(!(htup->t_infomask & HEAP_MOVED_IN));
> !                             Assert(htup->t_infomask & HEAP_MOVED_OFF);
> !                             Assert(HeapTupleHeaderGetXvac(htup) == myXID);
> ! 
> !                             itemid->lp_flags &= ~LP_USED;
> !                             num_tuples++;
>   
>                       }
>                       Assert(vacpage->offsets_free == num_tuples);
> ***************
> *** 2554,2564 ****
>       if (vacrelstats->vtlinks != NULL)
>               pfree(vacrelstats->vtlinks);
>   
> !     ExecDropTupleTable(tupleTable, true);
>   
> !     ExecCloseIndices(resultRelInfo);
>   
> !     FreeExecutorState(estate);
>   }
>   
>   /*
> --- 2381,2717 ----
>       if (vacrelstats->vtlinks != NULL)
>               pfree(vacrelstats->vtlinks);
>   
> !     ExecContext_Finish(&ec);
> ! }
> ! 
> ! /*
> !  *  move_chain_tuple() -- move one tuple that is part of a tuple chain
> !  *
> !  *          This routine moves old_tup from old_page to dst_page.
> !  *          old_page and dst_page might be the same page.
> !  *          On entry old_buf and dst_buf are locked exclusively, both locks (or
> !  *          the single lock, if this is a intra-page-move) are released before
> !  *          exit.
> !  *
> !  *          Yes, a routine with ten parameters is ugly, but it's still better
> !  *          than having these 120 lines of code in repair_frag() which is
> !  *          already too long and almost unreadable.
> !  */
> ! static void
> ! move_chain_tuple(Relation rel,
> !                              Buffer old_buf, Page old_page, HeapTuple old_tup,
> !                              Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> !                              ExecContext ec, ItemPointer ctid, bool cleanVpd)
> ! {
> !     TransactionId myXID = GetCurrentTransactionId();
> !     HeapTupleData   newtup;
> !     OffsetNumber    newoff;
> !     ItemId                  newitemid;
> !     Size                    tuple_len = old_tup->t_len;
> ! 
> !     heap_copytuple_with_tuple(old_tup, &newtup);
> ! 
> !     /*
> !      * register invalidation of source tuple in catcaches.
> !      */
> !     CacheInvalidateHeapTuple(rel, old_tup);
> ! 
> !     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> !     START_CRIT_SECTION();
> ! 
> !     old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                               HEAP_XMIN_INVALID |
> !                                                               HEAP_MOVED_IN);
> !     old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
> !     HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
> ! 
> !     /*
> !      * If this page was not used before - clean it.
> !      *
> !      * NOTE: a nasty bug used to lurk here.  It is possible
> !      * for the source and destination pages to be the same
> !      * (since this tuple-chain member can be on a page
> !      * lower than the one we're currently processing in
> !      * the outer loop).  If that's true, then after
> !      * vacuum_page() the source tuple will have been
> !      * moved, and tuple.t_data will be pointing at
> !      * garbage.  Therefore we must do everything that uses
> !      * old_tup->t_data BEFORE this step!!
> !      *
> !      * This path is different from the other callers of
> !      * vacuum_page, because we have already incremented
> !      * the vacpage's offsets_used field to account for the
> !      * tuple(s) we expect to move onto the page. Therefore
> !      * vacuum_page's check for offsets_used == 0 is wrong.
> !      * But since that's a good debugging check for all
> !      * other callers, we work around it here rather than
> !      * remove it.
> !      */
> !     if (!PageIsEmpty(dst_page) && cleanVpd)
> !     {
> !             int             sv_offsets_used = dst_vacpage->offsets_used;
> ! 
> !             dst_vacpage->offsets_used = 0;
> !             vacuum_page(rel, dst_buf, dst_vacpage);
> !             dst_vacpage->offsets_used = sv_offsets_used;
> !     }
>   
> !     /*
> !      * Update the state of the copied tuple, and store it
> !      * on the destination page.
> !      */
> !     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                                HEAP_XMIN_INVALID |
> !                                                                HEAP_MOVED_OFF);
> !     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !     newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
> !                                              InvalidOffsetNumber, LP_USED);
> !     if (newoff == InvalidOffsetNumber)
> !     {
> !             elog(PANIC, "failed to add item with len = %lu to page %u while moving 
> tuple chain",
> !               (unsigned long) tuple_len, dst_vacpage->blkno);
> !     }
> !     newitemid = PageGetItemId(dst_page, newoff);
> !     pfree(newtup.t_data);
> !     newtup.t_datamcxt = NULL;
> !     newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
> !     ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
> ! 
> !     /* XLOG stuff */
> !     if (!rel->rd_istemp)
> !     {
> !             XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
> !                                                                                
> dst_buf, &newtup);
> ! 
> !             if (old_buf != dst_buf)
> !             {
> !                     PageSetLSN(old_page, recptr);
> !                     PageSetSUI(old_page, ThisStartUpID);
> !             }
> !             PageSetLSN(dst_page, recptr);
> !             PageSetSUI(dst_page, ThisStartUpID);
> !     }
> !     else
> !     {
> !             /*
> !              * No XLOG record, but still need to flag that XID
> !              * exists on disk
> !              */
> !             MyXactMadeTempRelUpdate = true;
> !     }
> ! 
> !     END_CRIT_SECTION();
> ! 
> !     /*
> !      * Set new tuple's t_ctid pointing to itself for last
> !      * tuple in chain, and to next tuple in chain
> !      * otherwise.
> !      */
> !     /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
> !     if (!ItemPointerIsValid(ctid))
> !             newtup.t_data->t_ctid = newtup.t_self;
> !     else
> !             newtup.t_data->t_ctid = *ctid;
> !     *ctid = newtup.t_self;
>   
> !     LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
> !     if (dst_buf != old_buf)
> !             LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
> ! 
> !     /* Create index entries for the moved tuple */
> !     if (ec->resultRelInfo->ri_NumIndices > 0)
> !     {
> !             ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
> !             ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
> !     }
> ! }
> ! 
> ! /*
> !  *  move_plain_tuple() -- move one tuple that is not part of a chain
> !  *
> !  *          This routine moves old_tup from old_page to dst_page.
> !  *          On entry old_buf and dst_buf are locked exclusively, both locks are
> !  *          released before exit.
> !  *
> !  *          Yes, a routine with eight parameters is ugly, but it's still better
> !  *          than having these 90 lines of code in repair_frag() which is already
> !  *          too long and almost unreadable.
> !  */
> ! static void
> ! move_plain_tuple(Relation rel,
> !                              Buffer old_buf, Page old_page, HeapTuple old_tup,
> !                              Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> !                              ExecContext ec)
> ! {
> !     TransactionId myXID = GetCurrentTransactionId();
> !     HeapTupleData   newtup;
> !     OffsetNumber    newoff;
> !     ItemId                  newitemid;
> !     Size                    tuple_len = old_tup->t_len;
> ! 
> !     /* copy tuple */
> !     heap_copytuple_with_tuple(old_tup, &newtup);
> ! 
> !     /*
> !      * register invalidation of source tuple in catcaches.
> !      *
> !      * (Note: we do not need to register the copied tuple, because we
> !      * are not changing the tuple contents and so there cannot be
> !      * any need to flush negative catcache entries.)
> !      */
> !     CacheInvalidateHeapTuple(rel, old_tup);
> ! 
> !     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> !     START_CRIT_SECTION();
> ! 
> !     /*
> !      * Mark new tuple as MOVED_IN by me.
> !      */
> !     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                                HEAP_XMIN_INVALID |
> !                                                                HEAP_MOVED_OFF);
> !     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> ! 
> !     /* add tuple to the page */
> !     newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
> !                                              InvalidOffsetNumber, LP_USED);
> !     if (newoff == InvalidOffsetNumber)
> !     {
> !             elog(PANIC, "failed to add item with len = %lu to page %u (free space 
> %lu, nusd %u, noff %u)",
> !                      (unsigned long) tuple_len,
> !                      dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
> !                      dst_vacpage->offsets_used, dst_vacpage->offsets_free);
> !     }
> !     newitemid = PageGetItemId(dst_page, newoff);
> !     pfree(newtup.t_data);
> !     newtup.t_datamcxt = NULL;
> !     newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
> !     ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
> !     newtup.t_self = newtup.t_data->t_ctid;
> ! 
> !     /*
> !      * Mark old tuple as MOVED_OFF by me.
> !      */
> !     old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                               HEAP_XMIN_INVALID |
> !                                                               HEAP_MOVED_IN);
> !     old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
> !     HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
> ! 
> !     /* XLOG stuff */
> !     if (!rel->rd_istemp)
> !     {
> !             XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
> !                                                                                
> dst_buf, &newtup);
> ! 
> !             PageSetLSN(old_page, recptr);
> !             PageSetSUI(old_page, ThisStartUpID);
> !             PageSetLSN(dst_page, recptr);
> !             PageSetSUI(dst_page, ThisStartUpID);
> !     }
> !     else
> !     {
> !             /*
> !              * No XLOG record, but still need to flag that XID exists
> !              * on disk
> !              */
> !             MyXactMadeTempRelUpdate = true;
> !     }
> ! 
> !     END_CRIT_SECTION();
> ! 
> !     dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
> !                                             ((PageHeader) dst_page)->pd_lower;
> !     LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
> !     LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
> ! 
> !     dst_vacpage->offsets_used++;
> ! 
> !     /* insert index' tuples if needed */
> !     if (ec->resultRelInfo->ri_NumIndices > 0)
> !     {
> !             ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
> !             ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
> !     }
> ! }
> ! 
> ! /*
> !  *  update_hint_bits() -- update hint bits in destination pages
> !  *
> !  * Scan all the pages that we moved tuples onto and update tuple
> !  * status bits.  This is not really necessary, but will save time for
> !  * future transactions examining these tuples.
> !  *
> !  * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
> !  * pages that were move source pages but not move dest pages.  One
> !  * also wonders whether it wouldn't be better to skip this step and
> !  * let the tuple status updates happen someplace that's not holding an
> !  * exclusive lock on the relation.
> !  */
> ! static void
> ! update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
> !                              BlockNumber last_move_dest_block, int num_moved)
> ! {
> !     int                     checked_moved = 0;
> !     int                     i;
> !     VacPage    *curpage;
> ! 
> !     for (i = 0, curpage = fraged_pages->pagedesc;
> !              i < num_fraged_pages;
> !              i++, curpage++)
> !     {
> !             Buffer                  buf;
> !             Page                    page;
> !             OffsetNumber    max_offset;
> !             OffsetNumber    off;
> !             int                             num_tuples = 0;
> ! 
> !             vacuum_delay_point();
> ! 
> !             if ((*curpage)->blkno > last_move_dest_block)
> !                     break;                          /* no need to scan any further 
> */
> !             if ((*curpage)->offsets_used == 0)
> !                     continue;                       /* this page was never used as 
> a move dest */
> !             buf = ReadBuffer(rel, (*curpage)->blkno);
> !             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> !             page = BufferGetPage(buf);
> !             max_offset = PageGetMaxOffsetNumber(page);
> !             for (off = FirstOffsetNumber;
> !                      off <= max_offset;
> !                      off = OffsetNumberNext(off))
> !             {
> !                     ItemId  itemid = PageGetItemId(page, off);
> !                     HeapTupleHeader htup;
> ! 
> !                     if (!ItemIdIsUsed(itemid))
> !                             continue;
> !                     htup = (HeapTupleHeader) PageGetItem(page, itemid);
> !                     if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> !                             continue;
> !                     /*
> !                      * See comments in the walk-along-page loop above, why we
> !                      * have Asserts here instead of if (...) elog(ERROR).  The
> !                      * difference here is that we may see MOVED_IN.
> !                      */
> !                     Assert(htup->t_infomask & HEAP_MOVED);
> !                     Assert(HeapTupleHeaderGetXvac(htup) == 
> GetCurrentTransactionId());
> !                     if (htup->t_infomask & HEAP_MOVED_IN)
> !                     {
> !                             htup->t_infomask |= HEAP_XMIN_COMMITTED;
> !                             htup->t_infomask &= ~HEAP_MOVED;
> !                             num_tuples++;
> !                     }
> !                     else
> !                             htup->t_infomask |= HEAP_XMIN_INVALID;
> !             }
> !             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> !             WriteBuffer(buf);
> !             Assert((*curpage)->offsets_used == num_tuples);
> !             checked_moved += num_tuples;
> !     }
> !     Assert(num_moved == checked_moved);
>   }
>   
>   /*

> 
> ---------------------------(end of broadcast)---------------------------
> TIP 6: Have you searched our list archives?
> 
>                http://archives.postgresql.org

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  [EMAIL PROTECTED]               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

---------------------------(end of broadcast)---------------------------
TIP 2: you can get off all lists at once with the unregister command
    (send "unregister YourEmailAddressHere" to [EMAIL PROTECTED])

Reply via email to