Kyotaro HORIGUCHI wrote:

> The attached patch is quiiiccck-and-dirty-hack of Michael's patch
> just as a PoC of my proposal quoted above. This also passes the
> 006 test.  The major changes are the following.
> 
> - Moved sync_above and truncted_to into  RelationData.

Interesting.  I wonder if it's possible that a relcache invalidation
would cause these values to get lost for some reason, because that would
be dangerous.

I suppose the rationale is that this shouldn't happen because any
operation that does things this way must hold an exclusive lock on the
relation.  But that doesn't guarantee that the relcache entry is
completely stable, does it?  If we can get proof of that, then this
technique should be safe, I think.

In your version of the patch, which I spent some time skimming, I am
missing comments on various functions.  I added some as I went along,
including one XXX indicating it must be filled.

RecordPendingSync() should really live in relcache.c (and probably get a
different name).

> X I feel that I have dropped one of the features of the origitnal
>   patch during the hack, but I don't recall it clearly now:(

Hah :-)

> X I haven't consider relfilenode replacement, which didn't matter
>   for the original patch. (but there's few places to consider).

Hmm ...  Please provide.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0c3e2b0..aa1b97d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -34,6 +34,28 @@
  *       the POSTGRES heap access method used for all POSTGRES
  *       relations.
  *
+ * WAL CONSIDERATIONS
+ *       All heap operations are normally WAL-logged. but there are a few
+ *       exceptions. Temporary and unlogged relations never need to be
+ *       WAL-logged, but we can also skip WAL-logging for a table that was
+ *       created in the same transaction, if we don't need WAL for PITR or
+ *       WAL archival purposes (i.e. if wal_level=minimal), and we fsync()
+ *       the file to disk at COMMIT instead.
+ *
+ *       The same-relation optimization is not employed automatically on all
+ *       updates to a table that was created in the same transacton, because
+ *       for a small number of changes, it's cheaper to just create the WAL
+ *       records than fsyncing() the whole relation at COMMIT. It is only
+ *       worthwhile for (presumably) large operations like COPY, CLUSTER,
+ *       or VACUUM FULL. Use heap_register_sync() to initiate such an
+ *       operation; it will cause any subsequent updates to the table to skip
+ *       WAL-logging, if possible, and cause the heap to be synced to disk at
+ *       COMMIT.
+ *
+ *       To make that work, all modifications to heap must use
+ *       HeapNeedsWAL() to check if WAL-logging is needed in this transaction
+ *       for the given block.
+ *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -56,6 +78,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -2356,12 +2379,6 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * The new tuple is stamped with current transaction ID and the specified
  * command ID.
  *
- * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
- * logged in WAL, even for a non-temp relation.  Safe usage of this behavior
- * requires that we arrange that all new tuples go into new pages not
- * containing any tuples from other transactions, and that the relation gets
- * fsync'd before commit.  (See also heap_sync() comments)
- *
  * The HEAP_INSERT_SKIP_FSM option is passed directly to
  * RelationGetBufferForTuple, which see for more info.
  *
@@ -2465,7 +2482,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId 
cid,
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
-       if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, buffer))
        {
                xl_heap_insert xlrec;
                xl_heap_header xlhdr;
@@ -2664,12 +2681,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, 
int ntuples,
        int                     ndone;
        char       *scratch = NULL;
        Page            page;
-       bool            needwal;
        Size            saveFreeSpace;
        bool            need_tuple_data = RelationIsLogicallyLogged(relation);
        bool            need_cids = 
RelationIsAccessibleInLogicalDecoding(relation);
 
-       needwal = !(options & HEAP_INSERT_SKIP_WAL) && 
RelationNeedsWAL(relation);
        saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
                                                                                
                   HEAP_DEFAULT_FILLFACTOR);
 
@@ -2684,7 +2699,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, 
int ntuples,
         * palloc() within a critical section is not safe, so we allocate this
         * beforehand.
         */
-       if (needwal)
+       if (RelationNeedsWAL(relation))
                scratch = palloc(BLCKSZ);
 
        /*
@@ -2719,6 +2734,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, 
int ntuples,
                Buffer          vmbuffer = InvalidBuffer;
                bool            all_visible_cleared = false;
                int                     nthispage;
+               bool            needwal;
 
                CHECK_FOR_INTERRUPTS();
 
@@ -2730,6 +2746,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, 
int ntuples,
                                                                                
   InvalidBuffer, options, bistate,
                                                                                
   &vmbuffer, NULL);
                page = BufferGetPage(buffer);
+               needwal = BufferNeedsWAL(relation, buffer);
 
                /* NO EREPORT(ERROR) from here till changes are logged */
                START_CRIT_SECTION();
@@ -3286,7 +3303,7 @@ l1:
         * NB: heap_abort_speculative() uses the same xlog record and replay
         * routines.
         */
-       if (RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, buffer))
        {
                xl_heap_delete xlrec;
                XLogRecPtr      recptr;
@@ -4250,7 +4267,8 @@ l2:
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
-       if (RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, buffer) ||
+               BufferNeedsWAL(relation, newbuf))
        {
                XLogRecPtr      recptr;
 
@@ -5141,7 +5159,7 @@ failed:
         * (Also, in a PITR log-shipping or 2PC environment, we have to have 
XLOG
         * entries for everything anyway.)
         */
-       if (RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, *buffer))
        {
                xl_heap_lock xlrec;
                XLogRecPtr      recptr;
@@ -5843,7 +5861,7 @@ l4:
                MarkBufferDirty(buf);
 
                /* XLOG stuff */
-               if (RelationNeedsWAL(rel))
+               if (BufferNeedsWAL(rel, buf))
                {
                        xl_heap_lock_updated xlrec;
                        XLogRecPtr      recptr;
@@ -5998,7 +6016,7 @@ heap_finish_speculative(Relation relation, HeapTuple 
tuple)
        htup->t_ctid = tuple->t_self;
 
        /* XLOG stuff */
-       if (RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, buffer))
        {
                xl_heap_confirm xlrec;
                XLogRecPtr      recptr;
@@ -6131,7 +6149,7 @@ heap_abort_speculative(Relation relation, HeapTuple tuple)
         * The WAL records generated here match heap_delete().  The same 
recovery
         * routines are used.
         */
-       if (RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, buffer))
        {
                xl_heap_delete xlrec;
                XLogRecPtr      recptr;
@@ -6240,7 +6258,7 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
-       if (RelationNeedsWAL(relation))
+       if (BufferNeedsWAL(relation, buffer))
        {
                xl_heap_inplace xlrec;
                XLogRecPtr      recptr;
@@ -7354,7 +7372,7 @@ log_heap_clean(Relation reln, Buffer buffer,
        XLogRecPtr      recptr;
 
        /* Caller should not call me on a non-WAL-logged relation */
-       Assert(RelationNeedsWAL(reln));
+       Assert(BufferNeedsWAL(reln, buffer));
 
        xlrec.latestRemovedXid = latestRemovedXid;
        xlrec.nredirected = nredirected;
@@ -7402,7 +7420,7 @@ log_heap_freeze(Relation reln, Buffer buffer, 
TransactionId cutoff_xid,
        XLogRecPtr      recptr;
 
        /* Caller should not call me on a non-WAL-logged relation */
-       Assert(RelationNeedsWAL(reln));
+       Assert(BufferNeedsWAL(reln, buffer));
        /* nor when there are no tuples to freeze */
        Assert(ntuples > 0);
 
@@ -7487,7 +7505,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
        int                     bufflags;
 
        /* Caller should not call me on a non-WAL-logged relation */
-       Assert(RelationNeedsWAL(reln));
+       Assert(BufferNeedsWAL(reln, newbuf) || BufferNeedsWAL(reln, oldbuf));
 
        XLogBeginInsert();
 
@@ -7590,76 +7608,86 @@ log_heap_update(Relation reln, Buffer oldbuf,
        xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
        xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
 
+       XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
+
        bufflags = REGBUF_STANDARD;
        if (init)
                bufflags |= REGBUF_WILL_INIT;
        if (need_tuple_data)
                bufflags |= REGBUF_KEEP_DATA;
 
-       XLogRegisterBuffer(0, newbuf, bufflags);
-       if (oldbuf != newbuf)
-               XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
-
-       XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
-
        /*
         * Prepare WAL data for the new tuple.
         */
-       if (prefixlen > 0 || suffixlen > 0)
+       if (BufferNeedsWAL(reln, newbuf))
        {
-               if (prefixlen > 0 && suffixlen > 0)
-               {
-                       prefix_suffix[0] = prefixlen;
-                       prefix_suffix[1] = suffixlen;
-                       XLogRegisterBufData(0, (char *) &prefix_suffix, 
sizeof(uint16) * 2);
-               }
-               else if (prefixlen > 0)
-               {
-                       XLogRegisterBufData(0, (char *) &prefixlen, 
sizeof(uint16));
-               }
-               else
-               {
-                       XLogRegisterBufData(0, (char *) &suffixlen, 
sizeof(uint16));
-               }
-       }
+               XLogRegisterBuffer(0, newbuf, bufflags);
 
-       xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
-       xlhdr.t_infomask = newtup->t_data->t_infomask;
-       xlhdr.t_hoff = newtup->t_data->t_hoff;
-       Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len);
+               if ((prefixlen > 0 || suffixlen > 0))
+               {
+                       if (prefixlen > 0 && suffixlen > 0)
+                       {
+                               prefix_suffix[0] = prefixlen;
+                               prefix_suffix[1] = suffixlen;
+                               XLogRegisterBufData(0, (char *) &prefix_suffix,
+                                                                       
sizeof(uint16) * 2);
+                       }
+                       else if (prefixlen > 0)
+                       {
+                               XLogRegisterBufData(0, (char *) &prefixlen, 
sizeof(uint16));
+                       }
+                       else
+                       {
+                               XLogRegisterBufData(0, (char *) &suffixlen, 
sizeof(uint16));
+                       }
+               }
+
+               xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
+               xlhdr.t_infomask = newtup->t_data->t_infomask;
+               xlhdr.t_hoff = newtup->t_data->t_hoff;
+               Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= 
newtup->t_len);
 
-       /*
-        * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
-        *
-        * The 'data' doesn't include the common prefix or suffix.
-        */
-       XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
-       if (prefixlen == 0)
-       {
-               XLogRegisterBufData(0,
-                                                       ((char *) 
newtup->t_data) + SizeofHeapTupleHeader,
-                                                 newtup->t_len - 
SizeofHeapTupleHeader - suffixlen);
-       }
-       else
-       {
                /*
-                * Have to write the null bitmap and data after the common 
prefix as
-                * two separate rdata entries.
+                * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
+                *
+                * The 'data' doesn't include the common prefix or suffix.
                 */
-               /* bitmap [+ padding] [+ oid] */
-               if (newtup->t_data->t_hoff - SizeofHeapTupleHeader > 0)
+               XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
+               if (prefixlen == 0)
                {
                        XLogRegisterBufData(0,
                                                   ((char *) newtup->t_data) + 
SizeofHeapTupleHeader,
-                                                        newtup->t_data->t_hoff 
- SizeofHeapTupleHeader);
+                                                 newtup->t_len - 
SizeofHeapTupleHeader - suffixlen);
                }
+               else
+               {
+                       /*
+                        * Have to write the null bitmap and data after the 
common prefix
+                        * as two separate rdata entries.
+                        */
+                       /* bitmap [+ padding] [+ oid] */
+                       if (newtup->t_data->t_hoff - SizeofHeapTupleHeader > 0)
+                       {
+                               XLogRegisterBufData(0,
+                                                  ((char *) newtup->t_data) + 
SizeofHeapTupleHeader,
+                                                        newtup->t_data->t_hoff 
- SizeofHeapTupleHeader);
+                       }
 
-               /* data after common prefix */
-               XLogRegisterBufData(0,
+                       /* data after common prefix */
+                       XLogRegisterBufData(0,
                          ((char *) newtup->t_data) + newtup->t_data->t_hoff + 
prefixlen,
                         newtup->t_len - newtup->t_data->t_hoff - prefixlen - 
suffixlen);
+               }
        }
 
+       /*
+        * If the old and new tuple are on different pages, also register the 
old
+        * page, so that a full-page image is created for it if necessary. We
+        * don't need any extra information to replay changes to it.
+        */
+       if (oldbuf != newbuf && BufferNeedsWAL(reln, oldbuf))
+               XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
+
        /* We need to log a tuple identity */
        if (need_tuple_data && old_key_tuple)
        {
@@ -8578,8 +8606,13 @@ heap_xlog_update(XLogReaderState *record, bool 
hot_update)
         */
 
        /* Deal with old tuple version */
-       oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
-                                                                         
&obuffer);
+       if (oldblk == newblk)
+               oldaction = XLogReadBufferForRedo(record, 0, &obuffer);
+       else if (XLogRecHasBlockRef(record, 1))
+               oldaction = XLogReadBufferForRedo(record, 1, &obuffer);
+       else
+               oldaction = BLK_DONE;
+
        if (oldaction == BLK_NEEDS_REDO)
        {
                page = BufferGetPage(obuffer);
@@ -8633,6 +8666,8 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
                PageInit(page, BufferGetPageSize(nbuffer), 0);
                newaction = BLK_NEEDS_REDO;
        }
+       else if (!XLogRecHasBlockRef(record, 0))
+               newaction = BLK_DONE;
        else
                newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
 
@@ -9069,9 +9104,16 @@ heap2_redo(XLogReaderState *record)
  *     heap_sync               - sync a heap, for use when no WAL has been 
written
  *
  * This forces the heap contents (including TOAST heap if any) down to disk.
- * If we skipped using WAL, and WAL is otherwise needed, we must force the
- * relation down to disk before it's safe to commit the transaction.  This
- * requires writing out any dirty buffers and then doing a forced fsync.
+ * If we did any changes to the heap bypassing the buffer manager, we must
+ * force the relation down to disk before it's safe to commit the
+ * transaction, because the direct modifications will not be flushed by
+ * the next checkpoint.
+ *
+ * We used to also use this after batch operations like COPY and CLUSTER,
+ * if we skipped using WAL and WAL is otherwise needed, but there were
+ * corner-cases involving other WAL-logged operations to the same
+ * relation, where that was not enough. heap_register_sync() should be
+ * used for that purpose instead.
  *
  * Indexes are not touched.  (Currently, index operations associated with
  * the commands that use this are WAL-logged and so do not need fsync.
@@ -9181,3 +9223,33 @@ heap_mask(char *pagedata, BlockNumber blkno)
                }
        }
 }
+
+/*
+ *     heap_register_sync      - register a heap to be synced to disk at commit
+ *
+ * This can be used to skip WAL-logging changes on a relation file that has
+ * been created in the same transaction. This makes note of the current size of
+ * the relation, and ensures that when the relation is extended, any changes
+ * to the new blocks in the heap, in the same transaction, will not be
+ * WAL-logged. Instead, the heap contents are flushed to disk at commit,
+ * like heap_sync() does.
+ *
+ * This does the same for the TOAST heap, if any. Indexes are not affected.
+ */
+void
+heap_register_sync(Relation rel)
+{
+       /* non-WAL-logged tables never need fsync */
+       if (!RelationNeedsWAL(rel))
+               return;
+
+       RecordPendingSync(rel);
+       if (OidIsValid(rel->rd_rel->reltoastrelid))
+       {
+               Relation        toastrel;
+
+               toastrel = heap_open(rel->rd_rel->reltoastrelid, 
AccessShareLock);
+               RecordPendingSync(toastrel);
+               heap_close(toastrel, AccessShareLock);
+       }
+}
diff --git a/src/backend/access/heap/pruneheap.c 
b/src/backend/access/heap/pruneheap.c
index d69a266..4754278 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -20,6 +20,7 @@
 #include "access/htup_details.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -260,7 +261,7 @@ heap_page_prune(Relation relation, Buffer buffer, 
TransactionId OldestXmin,
                /*
                 * Emit a WAL HEAP_CLEAN record showing what we did
                 */
-               if (RelationNeedsWAL(relation))
+               if (BufferNeedsWAL(relation, buffer))
                {
                        XLogRecPtr      recptr;
 
diff --git a/src/backend/access/heap/rewriteheap.c 
b/src/backend/access/heap/rewriteheap.c
index d7f65a5..6462f44 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -649,9 +649,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
        }
        else if (HeapTupleHasExternal(tup) || tup->t_len > 
TOAST_TUPLE_THRESHOLD)
                heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
-                                                                               
 HEAP_INSERT_SKIP_FSM |
-                                                                               
 (state->rs_use_wal ?
-                                                                               
  0 : HEAP_INSERT_SKIP_WAL));
+                                                                               
 HEAP_INSERT_SKIP_FSM);
        else
                heaptup = tup;
 
diff --git a/src/backend/access/heap/visibilitymap.c 
b/src/backend/access/heap/visibilitymap.c
index e5616ce..933fa9c 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -88,6 +88,7 @@
 #include "access/heapam_xlog.h"
 #include "access/visibilitymap.h"
 #include "access/xlog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -307,7 +308,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer 
heapBuf,
                map[mapByte] |= (flags << mapOffset);
                MarkBufferDirty(vmBuf);
 
-               if (RelationNeedsWAL(rel))
+               if (BufferNeedsWAL(rel, heapBuf))
                {
                        if (XLogRecPtrIsInvalid(recptr))
                        {
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index f677916..929b5a0 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -254,11 +254,15 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
         * trouble if the truncation fails. If we then crash, the WAL replay
         * likely isn't going to succeed in the truncation either, and cause a
         * PANIC. It's tempting to put a critical section here, but that cure
-        * would be worse than the disease. It would turn a usually harmless
+        * would be worse than the disease: it would turn a usually harmless
         * failure to truncate, that might spell trouble at WAL replay, into a
         * certain PANIC.
+        *
+        * XXX Explain why we skip this sometimes.
         */
-       if (RelationNeedsWAL(rel))
+       if (RelationNeedsWAL(rel) &&
+               (rel->sync_above == InvalidBlockNumber ||
+                rel->sync_above < nblocks))
        {
                /*
                 * Make an XLOG entry reporting the file truncation.
@@ -268,7 +272,6 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 
                xlrec.blkno = nblocks;
                xlrec.rnode = rel->rd_node;
-               xlrec.flags = SMGR_TRUNCATE_ALL;
 
                XLogBeginInsert();
                XLogRegisterData((char *) &xlrec, sizeof(xlrec));
@@ -276,6 +279,10 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
                lsn = XLogInsert(RM_SMGR_ID,
                                                 XLOG_SMGR_TRUNCATE | 
XLR_SPECIAL_REL_UPDATE);
 
+               elog(DEBUG2, "WAL-logged truncation of rel %u/%u/%u to %u 
blocks",
+                        rel->rd_node.spcNode, rel->rd_node.dbNode, 
rel->rd_node.relNode,
+                        nblocks);
+
                /*
                 * Flush, because otherwise the truncation of the main relation 
might
                 * hit the disk before the WAL record, and the truncation of 
the FSM
@@ -285,6 +292,8 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
                 */
                if (fsm || vm)
                        XLogFlush(lsn);
+
+               rel->truncated_to = nblocks;
        }
 
        /* Do the real work */
@@ -420,6 +429,72 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 }
 
 /*
+ * RecordPendingSync
+ *             Make note that we need to sync buffers above the current 
relation size.
+ *
+ * (Thus, any operation that writes buffers above the current size can be
+ * optimized as not needing WAL; a relation sync will automatically be executed
+ * at transaction commit.)
+ */
+void
+RecordPendingSync(Relation rel)
+{
+       Assert(RelationNeedsWAL(rel));
+
+       if (rel->sync_above == InvalidBlockNumber)
+       {
+               elog(DEBUG2, "registering pending sync for rel %u/%u/%u at 
block %u",
+                        rel->rd_node.spcNode, rel->rd_node.dbNode, 
rel->rd_node.relNode,
+                        RelationGetNumberOfBlocks(rel));
+               rel->sync_above = RelationGetNumberOfBlocks(rel);
+       }
+       else
+               elog(DEBUG2, "pending sync for rel %u/%u/%u was already 
registered at block %u (new %u)",
+                        rel->rd_node.spcNode, rel->rd_node.dbNode, 
rel->rd_node.relNode,
+                        rel->sync_above, RelationGetNumberOfBlocks(rel));
+}
+
+/*
+ * BufferNeedsWAL
+ *             Return whether or not changes to the given buffer require to be
+ *             WAL-logged.
+ */
+bool
+BufferNeedsWAL(Relation rel, Buffer buf)
+{
+       BlockNumber blkno = InvalidBlockNumber;
+
+       if (!RelationNeedsWAL(rel))
+               return false;
+
+       blkno = BufferGetBlockNumber(buf);
+       if (rel->sync_above == InvalidBlockNumber ||
+                rel->sync_above > blkno)
+       {
+               elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block 
%u, because sync_above is %u",
+                        rel->rd_node.spcNode, rel->rd_node.dbNode, 
rel->rd_node.relNode,
+                        blkno, rel->sync_above);
+               return true;
+       }
+
+       /*
+        * We have emitted a truncation record for this block.
+        */
+       if (rel->truncated_to != InvalidBlockNumber &&
+               rel->truncated_to <= blkno)
+       {
+               elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block 
%u, because it was truncated earlier in the same xact",
+                        rel->rd_node.spcNode, rel->rd_node.dbNode, 
rel->rd_node.relNode, blkno);
+               return true;
+       }
+
+       elog(DEBUG2, "skipping WAL-logging for rel %u/%u/%u block %u",
+                rel->rd_node.spcNode, rel->rd_node.dbNode, 
rel->rd_node.relNode, blkno);
+
+       return false;
+}
+
+/*
  *     PostPrepare_smgr -- Clean up after a successful PREPARE
  *
  * What we have to do here is throw away the in-memory state about pending
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8c58808..cb9df1b 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2372,8 +2372,7 @@ CopyFrom(CopyState cstate)
         *      - data is being written to relfilenode created in this 
transaction
         * then we can skip writing WAL.  It's safe because if the transaction
         * doesn't commit, we'll discard the table (or the new relfilenode 
file).
-        * If it does commit, we'll have done the heap_sync at the bottom of 
this
-        * routine first.
+        * If it does commit, commit will do heap_sync().
         *
         * As mentioned in comments in utils/rel.h, the in-same-transaction test
         * is not always set correctly, since in rare cases 
rd_newRelfilenodeSubid
@@ -2405,7 +2404,7 @@ CopyFrom(CopyState cstate)
        {
                hi_options |= HEAP_INSERT_SKIP_FSM;
                if (!XLogIsNeeded())
-                       hi_options |= HEAP_INSERT_SKIP_WAL;
+                       heap_register_sync(cstate->rel);
        }
 
        /*
@@ -2784,11 +2783,11 @@ CopyFrom(CopyState cstate)
        FreeExecutorState(estate);
 
        /*
-        * If we skipped writing WAL, then we need to sync the heap (but not
-        * indexes since those use WAL anyway)
+        * If we skipped writing WAL, then we will sync the heap at the end of
+        * the transaction. (We used to do it here, but it was later found out
+        * that to be safe, we must also avoid WAL-logging any subsequent
+        * actions on the pages we skipped WAL for). Indexes always use WAL.
         */
-       if (hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(cstate->rel);
 
        return processed;
 }
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index f49b391..7710f82 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -567,8 +567,9 @@ intorel_startup(DestReceiver *self, int operation, 
TupleDesc typeinfo)
         * We can skip WAL-logging the insertions, unless PITR or streaming
         * replication is in use. We can skip the FSM in any case.
         */
-       myState->hi_options = HEAP_INSERT_SKIP_FSM |
-               (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
+       if (!XLogIsNeeded())
+               heap_register_sync(intoRelationDesc);
+       myState->hi_options = HEAP_INSERT_SKIP_FSM;
        myState->bistate = GetBulkInsertState();
 
        /* Not using WAL requires smgr_targblock be initially invalid */
@@ -617,9 +618,7 @@ intorel_shutdown(DestReceiver *self)
 
        FreeBulkInsertState(myState->bistate);
 
-       /* If we skipped using WAL, must heap_sync before commit */
-       if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(myState->rel);
+       /* If we skipped using WAL, we will sync the relation at commit */
 
        /* close rel, but keep lock until commit */
        heap_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 2f93328..514012b 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -477,7 +477,7 @@ transientrel_startup(DestReceiver *self, int operation, 
TupleDesc typeinfo)
         */
        myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
        if (!XLogIsNeeded())
-               myState->hi_options |= HEAP_INSERT_SKIP_WAL;
+               heap_register_sync(transientrel);
        myState->bistate = GetBulkInsertState();
 
        /* Not using WAL requires smgr_targblock be initially invalid */
@@ -520,9 +520,7 @@ transientrel_shutdown(DestReceiver *self)
 
        FreeBulkInsertState(myState->bistate);
 
-       /* If we skipped using WAL, must heap_sync before commit */
-       if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(myState->transientrel);
+       /* If we skipped using WAL, we will sync the relation at commit */
 
        /* close transientrel, but keep lock until commit */
        heap_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 60f8b7f..9b14053 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4327,8 +4327,9 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, 
LOCKMODE lockmode)
                bistate = GetBulkInsertState();
 
                hi_options = HEAP_INSERT_SKIP_FSM;
+
                if (!XLogIsNeeded())
-                       hi_options |= HEAP_INSERT_SKIP_WAL;
+                       heap_register_sync(newrel);
        }
        else
        {
@@ -4589,8 +4590,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, 
LOCKMODE lockmode)
                FreeBulkInsertState(bistate);
 
                /* If we skipped writing WAL, then we need to sync the heap. */
-               if (hi_options & HEAP_INSERT_SKIP_WAL)
-                       heap_sync(newrel);
 
                heap_close(newrel, NoLock);
        }
diff --git a/src/backend/commands/vacuumlazy.c 
b/src/backend/commands/vacuumlazy.c
index 5b43a66..f3dcf6e 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -893,7 +893,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
                                 * page has been previously WAL-logged, and if 
not, do that
                                 * now.
                                 */
-                               if (RelationNeedsWAL(onerel) &&
+                               if (BufferNeedsWAL(onerel, buf) &&
                                        PageGetLSN(page) == InvalidXLogRecPtr)
                                        log_newpage_buffer(buf, true);
 
@@ -1120,7 +1120,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats 
*vacrelstats,
                        }
 
                        /* Now WAL-log freezing if necessary */
-                       if (RelationNeedsWAL(onerel))
+                       if (BufferNeedsWAL(onerel, buf))
                        {
                                XLogRecPtr      recptr;
 
@@ -1480,7 +1480,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, 
Buffer buffer,
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
-       if (RelationNeedsWAL(onerel))
+       if (BufferNeedsWAL(onerel, buffer))
        {
                XLogRecPtr      recptr;
 
diff --git a/src/backend/storage/buffer/bufmgr.c 
b/src/backend/storage/buffer/bufmgr.c
index 2109cbf..f7c2b16 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -451,6 +451,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
                        BufferAccessStrategy strategy,
                        bool *foundPtr);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
+static void FlushRelationBuffers_common(SMgrRelation smgr, bool islocal);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int     rnode_comparator(const void *p1, const void *p2);
@@ -3147,20 +3148,41 @@ PrintPinnedBufs(void)
 void
 FlushRelationBuffers(Relation rel)
 {
-       int                     i;
-       BufferDesc *bufHdr;
-
        /* Open rel at the smgr level if not already done */
        RelationOpenSmgr(rel);
 
-       if (RelationUsesLocalBuffers(rel))
+       FlushRelationBuffers_common(rel->rd_smgr, 
RelationUsesLocalBuffers(rel));
+}
+
+/*
+ * Like FlushRelationBuffers(), but the relation is specified by a
+ * RelFileNode
+ */
+void
+FlushRelationBuffersWithoutRelcache(RelFileNode rnode, bool islocal)
+{
+       FlushRelationBuffers_common(smgropen(rnode, InvalidBackendId), islocal);
+}
+
+/*
+ * Code shared between functions FlushRelationBuffers() and
+ * FlushRelationBuffersWithoutRelcache().
+ */
+static void
+FlushRelationBuffers_common(SMgrRelation smgr, bool islocal)
+{
+       RelFileNode rnode = smgr->smgr_rnode.node;
+       int                     i;
+       BufferDesc *bufHdr;
+
+       if (islocal)
        {
                for (i = 0; i < NLocBuffer; i++)
                {
                        uint32          buf_state;
 
                        bufHdr = GetLocalBufferDescriptor(i);
-                       if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) 
&&
+                       if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
                                ((buf_state = 
pg_atomic_read_u32(&bufHdr->state)) &
                                 (BM_VALID | BM_DIRTY)) == (BM_VALID | 
BM_DIRTY))
                        {
@@ -3177,7 +3199,7 @@ FlushRelationBuffers(Relation rel)
 
                                PageSetChecksumInplace(localpage, 
bufHdr->tag.blockNum);
 
-                               smgrwrite(rel->rd_smgr,
+                               smgrwrite(smgr,
                                                  bufHdr->tag.forkNum,
                                                  bufHdr->tag.blockNum,
                                                  localpage,
@@ -3207,18 +3229,18 @@ FlushRelationBuffers(Relation rel)
                 * As in DropRelFileNodeBuffers, an unlocked precheck should be 
safe
                 * and saves some cycles.
                 */
-               if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+               if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode))
                        continue;
 
                ReservePrivateRefCountEntry();
 
                buf_state = LockBufHdr(bufHdr);
-               if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+               if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
                        (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | 
BM_DIRTY))
                {
                        PinBuffer_Locked(bufHdr);
                        LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), 
LW_SHARED);
-                       FlushBuffer(bufHdr, rel->rd_smgr);
+                       FlushBuffer(bufHdr, smgr);
                        LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
                        UnpinBuffer(bufHdr, true);
                }
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index ddb9485..11913f9 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -72,6 +72,7 @@
 #include "optimizer/var.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rowsecurity.h"
+#include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/array.h"
@@ -418,6 +419,9 @@ AllocateRelationDesc(Form_pg_class relp)
        /* which we mark as a reference-counted tupdesc */
        relation->rd_att->tdrefcount = 1;
 
+       relation->sync_above = InvalidBlockNumber;
+       relation->truncated_to = InvalidBlockNumber;
+
        MemoryContextSwitchTo(oldcxt);
 
        return relation;
@@ -2032,6 +2036,9 @@ formrdesc(const char *relationName, Oid relationReltype,
                relation->rd_rel->relhasindex = true;
        }
 
+       relation->sync_above = InvalidBlockNumber;
+       relation->truncated_to = InvalidBlockNumber;
+
        /*
         * add new reldesc to relcache
         */
@@ -2366,6 +2373,24 @@ RelationDestroyRelation(Relation relation, bool 
remember_tupdesc)
 }
 
 /*
+ * If this relation has a pending flush request, execute it.
+ */
+static void
+RelationDoPendingFlush(Relation relation)
+{
+       if (relation->sync_above != InvalidBlockNumber)
+       {
+               FlushRelationBuffersWithoutRelcache(relation->rd_node, false);
+               smgrimmedsync(smgropen(relation->rd_node, InvalidBackendId),
+                                         MAIN_FORKNUM);
+
+               elog(DEBUG2, "syncing rel %u/%u/%u",
+                        relation->rd_node.spcNode,
+                        relation->rd_node.dbNode, relation->rd_node.relNode);
+       }
+}
+
+/*
  * RelationClearRelation
  *
  *      Physically blow away a relation cache entry, or reset it and rebuild
@@ -3015,7 +3040,10 @@ AtEOXact_cleanup(Relation relation, bool isCommit)
        if (relation->rd_createSubid != InvalidSubTransactionId)
        {
                if (isCommit)
+               {
+                       RelationDoPendingFlush(relation);
                        relation->rd_createSubid = InvalidSubTransactionId;
+               }
                else if (RelationHasReferenceCountZero(relation))
                {
                        RelationClearRelation(relation, false);
@@ -3353,6 +3381,9 @@ RelationBuildLocalRelation(const char *relname,
        else
                rel->rd_rel->relfilenode = relfilenode;
 
+       rel->sync_above = InvalidBlockNumber;
+       rel->truncated_to = InvalidBlockNumber;
+
        RelationInitLockInfo(rel);      /* see lmgr.c */
 
        RelationInitPhysicalAddr(rel);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 7e85510..aa069a5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,7 +25,7 @@
 
 
 /* "options" flag bits for heap_insert */
-#define HEAP_INSERT_SKIP_WAL   0x0001
+/* 0x0001 is free */
 #define HEAP_INSERT_SKIP_FSM   0x0002
 #define HEAP_INSERT_FROZEN             0x0004
 #define HEAP_INSERT_SPECULATIVE 0x0008
@@ -178,6 +178,7 @@ extern void simple_heap_delete(Relation relation, 
ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
                                   HeapTuple tup);
 
+extern void heap_register_sync(Relation relation);
 extern void heap_sync(Relation relation);
 extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index fea96de..415b98a 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -29,6 +29,8 @@ extern void RelationTruncate(Relation rel, BlockNumber 
nblocks);
  */
 extern void smgrDoPendingDeletes(bool isCommit);
 extern int     smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void RecordPendingSync(Relation rel);
+bool BufferNeedsWAL(Relation rel, Buffer buf);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 07a32d6..ac6f866 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -190,6 +190,8 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation 
relation,
                                                                ForkNumber 
forkNum);
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
+extern void FlushRelationBuffersWithoutRelcache(RelFileNode rnode,
+                                                                       bool 
islocal);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode,
                                           ForkNumber forkNum, BlockNumber 
firstDelBlock);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index ab875bb..03244be 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -216,6 +216,10 @@ typedef struct RelationData
 
        /* use "struct" here to avoid needing to include pgstat.h: */
        struct PgStat_TableStatus *pgstat_info;         /* statistics 
collection area */
+
+       /* support for WAL-flush-skipping */
+       BlockNumber sync_above;
+       BlockNumber truncated_to;
 } RelationData;
 
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to