Hi,

Eeek, the attached patch included a trivial last minute screwup
(dereferencing bistate unconditionally...). Fixed version attached.

Andres
>From fc095897a6f4207d384559a095f80a36cf49648c Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 29 Mar 2015 20:55:32 +0200
Subject: [PATCH] WIP: Saner heap extension.

---
 src/backend/access/heap/hio.c       |  86 ++++----
 src/backend/commands/vacuumlazy.c   |  39 ++--
 src/backend/storage/buffer/bufmgr.c | 377 ++++++++++++++++++++++++++----------
 src/backend/storage/smgr/md.c       |  62 ++++++
 src/backend/storage/smgr/smgr.c     |  20 +-
 src/include/storage/buf_internals.h |   1 +
 src/include/storage/bufmgr.h        |   1 +
 src/include/storage/smgr.h          |   7 +-
 8 files changed, 417 insertions(+), 176 deletions(-)

diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 6db73bf..b47f9fe 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -15,6 +15,8 @@
 
 #include "postgres.h"
 
+#include "miscadmin.h"
+
 #include "access/heapam.h"
 #include "access/hio.h"
 #include "access/htup_details.h"
@@ -237,7 +239,6 @@ RelationGetBufferForTuple(Relation relation, Size len,
 				saveFreeSpace;
 	BlockNumber targetBlock,
 				otherBlock;
-	bool		needLock;
 
 	len = MAXALIGN(len);		/* be conservative */
 
@@ -433,63 +434,50 @@ RelationGetBufferForTuple(Relation relation, Size len,
 	/*
 	 * Have to extend the relation.
 	 *
-	 * We have to use a lock to ensure no one else is extending the rel at the
-	 * same time, else we will both try to initialize the same new page.  We
-	 * can skip locking for new or temp relations, however, since no one else
-	 * could be accessing them.
+	 * To avoid, as it used to be the case, holding the extension lock during
+	 * victim buffer search for the new buffer, we extend the relation here
+	 * instead of relying on bufmgr.c. We still have to hold the extension
+	 * lock to prevent a race between two backends initializing the same page.
 	 */
-	needLock = !RELATION_IS_LOCAL(relation);
-
-	if (needLock)
-		LockRelationForExtension(relation, ExclusiveLock);
+	while(true)
+	{
+		buffer = ExtendRelation(relation, MAIN_FORKNUM, bistate->strategy);
 
-	/*
-	 * XXX This does an lseek - rather expensive - but at the moment it is the
-	 * only way to accurately determine how many blocks are in a relation.  Is
-	 * it worth keeping an accurate file length in shared memory someplace,
-	 * rather than relying on the kernel to do it for us?
-	 */
-	buffer = ReadBufferBI(relation, P_NEW, bistate);
+		if (otherBuffer != InvalidBuffer)
+			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
 
-	/*
-	 * We can be certain that locking the otherBuffer first is OK, since it
-	 * must have a lower page number.
-	 */
-	if (otherBuffer != InvalidBuffer)
-		LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+		/*
+		 * Now acquire lock on the new page.
+		 */
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-	/*
-	 * Now acquire lock on the new page.
-	 */
-	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		page = BufferGetPage(buffer);
 
-	/*
-	 * Release the file-extension lock; it's now OK for someone else to extend
-	 * the relation some more.  Note that we cannot release this lock before
-	 * we have buffer lock on the new page, or we risk a race condition
-	 * against vacuumlazy.c --- see comments therein.
-	 */
-	if (needLock)
-		UnlockRelationForExtension(relation, ExclusiveLock);
+		/*
+		 * While unlikely, it's possible that another backend managed to
+		 * initialize the page and use up the free space till we got the
+		 * exclusive lock. That'd require the page to be vacuumed (to be put
+		 * on the free space list) and then be used; possible but fairly
+		 * unlikely in practice. If it happens and there's not enough space,
+		 * just retry.
+		 */
+		if (PageIsNew(page))
+		{
+			PageInit(page, BLCKSZ, 0);
 
-	/*
-	 * We need to initialize the empty new page.  Double-check that it really
-	 * is empty (this should never happen, but if it does we don't want to
-	 * risk wiping out valid data).
-	 */
-	page = BufferGetPage(buffer);
+			Assert(len <= PageGetHeapFreeSpace(page));
+			break;
+		}
+		else if (len <= PageGetHeapFreeSpace(page))
+			break;
 
-	if (!PageIsNew(page))
-		elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
-			 BufferGetBlockNumber(buffer),
-			 RelationGetRelationName(relation));
+		if (otherBuffer != InvalidBuffer)
+			LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
 
-	PageInit(page, BufferGetPageSize(buffer), 0);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+		ReleaseBuffer(buffer);
 
-	if (len > PageGetHeapFreeSpace(page))
-	{
-		/* We should not get here given the test at the top */
-		elog(PANIC, "tuple is too big: size %zu", len);
+		CHECK_FOR_INTERRUPTS();
 	}
 
 	/*
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index a01cfb4..896731c 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -674,35 +674,18 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			/*
 			 * An all-zeroes page could be left over if a backend extends the
 			 * relation but crashes before initializing the page. Reclaim such
-			 * pages for use.
-			 *
-			 * We have to be careful here because we could be looking at a
-			 * page that someone has just added to the relation and not yet
-			 * been able to initialize (see RelationGetBufferForTuple). To
-			 * protect against that, release the buffer lock, grab the
-			 * relation extension lock momentarily, and re-lock the buffer. If
-			 * the page is still uninitialized by then, it must be left over
-			 * from a crashed backend, and we can initialize it.
-			 *
-			 * We don't really need the relation lock when this is a new or
-			 * temp relation, but it's probably not worth the code space to
-			 * check that, since this surely isn't a critical path.
-			 *
-			 * Note: the comparable code in vacuum.c need not worry because
-			 * it's got exclusive lock on the whole relation.
+			 * pages for use.  It is also possible that we're looking at a
+			 * page that has just added but not yet initialized (see
+			 * RelationGetBufferForTuple). In that case we just initialize the
+			 * page here. That means the page will end up in the free space
+			 * map a little earlier, but that seems fine.
 			 */
-			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-			LockRelationForExtension(onerel, ExclusiveLock);
-			UnlockRelationForExtension(onerel, ExclusiveLock);
-			LockBufferForCleanup(buf);
-			if (PageIsNew(page))
-			{
-				ereport(WARNING,
-				(errmsg("relation \"%s\" page %u is uninitialized --- fixing",
-						relname, blkno)));
-				PageInit(page, BufferGetPageSize(buf), 0);
-				empty_pages++;
-			}
+			ereport(DEBUG2,
+					(errmsg("relation \"%s\" page %u is uninitialized --- fixing",
+							relname, blkno)));
+			PageInit(page, BufferGetPageSize(buf), 0);
+			empty_pages++;
+
 			freespace = PageGetHeapFreeSpace(page);
 			MarkBufferDirty(buf);
 			UnlockReleaseBuffer(buf);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index e4b25587..4613666 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -392,6 +392,7 @@ static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
 				  ForkNumber forkNum, BlockNumber blockNum,
 				  ReadBufferMode mode, BufferAccessStrategy strategy,
 				  bool *hit);
+static volatile BufferDesc *GetVictimBuffer(BufferAccessStrategy strategy, BufFlags *oldFlags);
 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(volatile BufferDesc *buf);
 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
@@ -483,6 +484,176 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
 #endif   /* USE_PREFETCH */
 }
 
+Buffer
+ExtendRelation(Relation reln, ForkNumber forkNum, BufferAccessStrategy strategy)
+{
+	BlockNumber	blockno;
+	Buffer		buf_id;
+	volatile BufferDesc *buf;
+	BufFlags	oldFlags;
+	Block		bufBlock;
+	bool		isLocalBuf = RelationUsesLocalBuffers(reln);
+	int			readblocks;
+
+	BufferTag	oldTag;			/* previous identity of selected buffer */
+	uint32		oldHash;		/* hash value for oldTag */
+	LWLock	   *oldPartitionLock;		/* buffer partition lock for it */
+
+	BufferTag	newTag;
+	uint32		newHash;
+	LWLock	   *newPartitionLock;
+
+	/* FIXME: This obviously isn't acceptable for integration */
+	if (isLocalBuf)
+	{
+		return ReadBufferExtended(reln, forkNum, P_NEW, RBM_NORMAL, strategy);
+	}
+
+	/* Open it at the smgr level if not already done */
+	RelationOpenSmgr(reln);
+
+	/* Make sure we will have room to remember the buffer pin */
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+retry_victim:
+	/* we'll need a clean unassociated victim buffer */
+	while (true)
+	{
+		bool		gotIt = false;
+
+		/*
+		 * Returns a buffer that was unpinned and not dirty at the time of the
+		 * check.
+		 */
+		buf = GetVictimBuffer(strategy, &oldFlags);
+
+		if (oldFlags & BM_TAG_VALID)
+		{
+			oldTag = buf->tag;
+			oldHash = BufTableHashCode(&oldTag);
+			oldPartitionLock = BufMappingPartitionLock(oldHash);
+			LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
+		}
+
+		LockBufHdr(buf);
+
+		/* somebody else might have re-pinned the buffer by now */
+		if (buf->refcount != 1 || (buf->flags & BM_DIRTY))
+		{
+			UnlockBufHdr(buf);
+		}
+		else
+		{
+			buf->flags &= ~(BM_TAG_VALID | BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | BM_NEW);
+
+			UnlockBufHdr(buf);
+
+			gotIt = true;
+
+			if (oldFlags & BM_TAG_VALID)
+				BufTableDelete(&oldTag, oldHash);
+		}
+
+		if (oldFlags & BM_TAG_VALID)
+			LWLockRelease(oldPartitionLock);
+
+		if (gotIt)
+			break;
+		else
+			UnpinBuffer(buf, true);
+	}
+
+	/*
+	 * At this state we have an empty victim buffer; pinned to prevent it from
+	 * being reused.
+	 */
+
+	/*
+	 * First try the current end of the relation. If a concurrent process has
+	 * acquired that, try the next one after that.
+	 */
+	blockno = smgrnblocks(reln->rd_smgr, forkNum);
+
+	while (true)
+	{
+		INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node, forkNum, blockno);
+
+		newHash = BufTableHashCode(&newTag);
+		newPartitionLock = BufMappingPartitionLock(newHash);
+		LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
+
+		buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
+
+		if (buf_id >= 0)
+		{
+			/* somebody else got this block, try the next one */
+			LWLockRelease(newPartitionLock);
+			blockno++;
+			continue;
+		}
+
+		LockBufHdr(buf);
+
+		buf->tag = newTag;
+		if (reln->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)
+			buf->flags |= BM_NEW | BM_TAG_VALID | BM_PERMANENT;
+		else
+			buf->flags |= BM_NEW | BM_TAG_VALID;
+		buf->usage_count = 1;
+
+		UnlockBufHdr(buf);
+		LWLockRelease(newPartitionLock);
+
+		break;
+	}
+
+	/*
+	 * By here we made a entry into the buffer table, but haven't yet
+	 * read/written the page.  We can't just initialize the page, potentially
+	 * while we were busy with the above, another backend could have extended
+	 * the relation, written something, and the buffer could already have been
+	 * reused for something else.
+	 */
+
+	if (!StartBufferIO(buf, true))
+	{
+		/*
+		 * Somebody else is already using this block. Just try another one.
+		 */
+		UnpinBuffer(buf, true);
+		goto retry_victim;
+	}
+
+	/*
+	 * FIXME: if we die here we might have a problem: Everyone trying to read
+	 * this block will get a failure. Need to add checks for BM_NEW against
+	 * that. That's not really new to this code tho.
+	 */
+
+	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(buf) : BufHdrGetBlock(buf);
+
+	readblocks = smgrtryread(reln->rd_smgr, forkNum, blockno, bufBlock);
+
+	if (readblocks != BLCKSZ)
+	{
+		MemSet((char *) bufBlock, 0, BLCKSZ);
+
+		smgrextend(reln->rd_smgr, forkNum, blockno, (char *) bufBlock, false);
+
+		/* Set BM_VALID, terminate IO, and wake up any waiters */
+		TerminateBufferIO(buf, false, BM_VALID);
+	}
+	else
+	{
+		/* Set BM_VALID, terminate IO, and wake up any waiters */
+		TerminateBufferIO(buf, false, BM_VALID);
+		UnpinBuffer(buf, true);
+
+		goto retry_victim;
+	}
+
+	return BufferDescriptorGetBuffer(buf);
+}
 
 /*
  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
@@ -847,6 +1018,112 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	return BufferDescriptorGetBuffer(bufHdr);
 }
 
+static volatile BufferDesc *
+GetVictimBuffer(BufferAccessStrategy strategy, BufFlags *oldFlags)
+{
+	volatile BufferDesc *buf;
+
+	/*
+	 * Ensure, while the spinlock's not yet held, that there's a free refcount
+	 * entry.
+	 */
+	ReservePrivateRefCountEntry();
+
+retry:
+	/*
+	 * Select a victim buffer.  The buffer is returned with its header
+	 * spinlock still held!
+	 */
+	buf = StrategyGetBuffer(strategy);
+
+	Assert(buf->refcount == 0);
+
+	/* Must copy buffer flags while we still hold the spinlock */
+	*oldFlags = buf->flags;
+
+	/* Pin the buffer and then release the buffer spinlock */
+	PinBuffer_Locked(buf);
+
+	/*
+	 * If the buffer was dirty, try to write it out.  There is a race
+	 * condition here, in that someone might dirty it after we released it
+	 * above, or even while we are writing it out (since our share-lock
+	 * won't prevent hint-bit updates).  We will recheck the dirty bit
+	 * after re-locking the buffer header.
+	 */
+	if (*oldFlags & BM_DIRTY)
+	{
+		/*
+		 * We need a share-lock on the buffer contents to write it out
+		 * (else we might write invalid data, eg because someone else is
+		 * compacting the page contents while we write).  We must use a
+		 * conditional lock acquisition here to avoid deadlock.  Even
+		 * though the buffer was not pinned (and therefore surely not
+		 * locked) when StrategyGetBuffer returned it, someone else could
+		 * have pinned and exclusive-locked it by the time we get here. If
+		 * we try to get the lock unconditionally, we'd block waiting for
+		 * them; if they later block waiting for us, deadlock ensues.
+		 * (This has been observed to happen when two backends are both
+		 * trying to split btree index pages, and the second one just
+		 * happens to be trying to split the page the first one got from
+		 * StrategyGetBuffer.)
+		 */
+		if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
+		{
+			/*
+			 * If using a nondefault strategy, and writing the buffer
+			 * would require a WAL flush, let the strategy decide whether
+			 * to go ahead and write/reuse the buffer or to choose another
+			 * victim.  We need lock to inspect the page LSN, so this
+			 * can't be done inside StrategyGetBuffer.
+			 */
+			if (strategy != NULL)
+			{
+				XLogRecPtr	lsn;
+
+				/* Read the LSN while holding buffer header lock */
+				LockBufHdr(buf);
+				lsn = BufferGetLSN(buf);
+				UnlockBufHdr(buf);
+
+				if (XLogNeedsFlush(lsn) &&
+					StrategyRejectBuffer(strategy, buf))
+				{
+					/* Drop lock/pin and loop around for another buffer */
+					LWLockRelease(buf->content_lock);
+					UnpinBuffer(buf, true);
+					goto retry;
+				}
+			}
+
+			/* OK, do the I/O */
+			TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
+										   smgr->smgr_rnode.node.spcNode,
+											smgr->smgr_rnode.node.dbNode,
+										  smgr->smgr_rnode.node.relNode);
+
+			FlushBuffer(buf, NULL);
+			LWLockRelease(buf->content_lock);
+
+			TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
+										   smgr->smgr_rnode.node.spcNode,
+											smgr->smgr_rnode.node.dbNode,
+										  smgr->smgr_rnode.node.relNode);
+		}
+		else
+		{
+			/*
+			 * Someone else has locked the buffer, so give it up and loop
+			 * back to get another one.
+			 */
+			UnpinBuffer(buf, true);
+			goto retry;
+		}
+	}
+
+	return buf;
+}
+
 /*
  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
  *		buffer.  If no buffer exists already, selects a replacement
@@ -940,102 +1217,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	/* Loop here in case we have to try another victim buffer */
 	for (;;)
 	{
-		/*
-		 * Ensure, while the spinlock's not yet held, that there's a free
-		 * refcount entry.
-		 */
-		ReservePrivateRefCountEntry();
-
-		/*
-		 * Select a victim buffer.  The buffer is returned with its header
-		 * spinlock still held!
-		 */
-		buf = StrategyGetBuffer(strategy);
-
-		Assert(buf->refcount == 0);
-
-		/* Must copy buffer flags while we still hold the spinlock */
-		oldFlags = buf->flags;
-
-		/* Pin the buffer and then release the buffer spinlock */
-		PinBuffer_Locked(buf);
-
-		/*
-		 * If the buffer was dirty, try to write it out.  There is a race
-		 * condition here, in that someone might dirty it after we released it
-		 * above, or even while we are writing it out (since our share-lock
-		 * won't prevent hint-bit updates).  We will recheck the dirty bit
-		 * after re-locking the buffer header.
-		 */
-		if (oldFlags & BM_DIRTY)
-		{
-			/*
-			 * We need a share-lock on the buffer contents to write it out
-			 * (else we might write invalid data, eg because someone else is
-			 * compacting the page contents while we write).  We must use a
-			 * conditional lock acquisition here to avoid deadlock.  Even
-			 * though the buffer was not pinned (and therefore surely not
-			 * locked) when StrategyGetBuffer returned it, someone else could
-			 * have pinned and exclusive-locked it by the time we get here. If
-			 * we try to get the lock unconditionally, we'd block waiting for
-			 * them; if they later block waiting for us, deadlock ensues.
-			 * (This has been observed to happen when two backends are both
-			 * trying to split btree index pages, and the second one just
-			 * happens to be trying to split the page the first one got from
-			 * StrategyGetBuffer.)
-			 */
-			if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
-			{
-				/*
-				 * If using a nondefault strategy, and writing the buffer
-				 * would require a WAL flush, let the strategy decide whether
-				 * to go ahead and write/reuse the buffer or to choose another
-				 * victim.  We need lock to inspect the page LSN, so this
-				 * can't be done inside StrategyGetBuffer.
-				 */
-				if (strategy != NULL)
-				{
-					XLogRecPtr	lsn;
-
-					/* Read the LSN while holding buffer header lock */
-					LockBufHdr(buf);
-					lsn = BufferGetLSN(buf);
-					UnlockBufHdr(buf);
-
-					if (XLogNeedsFlush(lsn) &&
-						StrategyRejectBuffer(strategy, buf))
-					{
-						/* Drop lock/pin and loop around for another buffer */
-						LWLockRelease(buf->content_lock);
-						UnpinBuffer(buf, true);
-						continue;
-					}
-				}
-
-				/* OK, do the I/O */
-				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
-											   smgr->smgr_rnode.node.spcNode,
-												smgr->smgr_rnode.node.dbNode,
-											  smgr->smgr_rnode.node.relNode);
-
-				FlushBuffer(buf, NULL);
-				LWLockRelease(buf->content_lock);
-
-				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
-											   smgr->smgr_rnode.node.spcNode,
-												smgr->smgr_rnode.node.dbNode,
-											  smgr->smgr_rnode.node.relNode);
-			}
-			else
-			{
-				/*
-				 * Someone else has locked the buffer, so give it up and loop
-				 * back to get another one.
-				 */
-				UnpinBuffer(buf, true);
-				continue;
-			}
-		}
+		/* returns a nondirty buffer, with potentially valid contents */
+		buf = GetVictimBuffer(strategy, &oldFlags);
 
 		/*
 		 * To change the association of a valid buffer, we'll need to have
@@ -1171,7 +1354,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	 * 1 so that the buffer can survive one clock-sweep pass.)
 	 */
 	buf->tag = newTag;
-	buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
+	buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | BM_NEW);
 	if (relpersistence == RELPERSISTENCE_PERMANENT)
 		buf->flags |= BM_TAG_VALID | BM_PERMANENT;
 	else
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 42a43bb..0038c91 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -729,6 +729,68 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+
+/*
+ *	mdtryread() -- Read the specified block from a relation.
+ */
+int
+mdtryread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+	   char *buffer)
+{
+	off_t		seekpos;
+	int			nbytes;
+	MdfdVec    *v;
+
+	TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum,
+										reln->smgr_rnode.node.spcNode,
+										reln->smgr_rnode.node.dbNode,
+										reln->smgr_rnode.node.relNode,
+										reln->smgr_rnode.backend);
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_RETURN_NULL);
+
+	/* would need another segment */
+	if (v == NULL)
+		return 0;
+
+	seekpos = (off_t) BLCKSZ *(blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	if (FileSeek(v->mdfd_vfd, seekpos, SEEK_SET) != seekpos)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not seek to block %u in file \"%s\": %m",
+						blocknum, FilePathName(v->mdfd_vfd))));
+
+	nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ);
+
+	TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum,
+									   reln->smgr_rnode.node.spcNode,
+									   reln->smgr_rnode.node.dbNode,
+									   reln->smgr_rnode.node.relNode,
+									   reln->smgr_rnode.backend,
+									   nbytes,
+									   BLCKSZ);
+
+	if (nbytes < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read block %u in file \"%s\": %m",
+						blocknum, FilePathName(v->mdfd_vfd))));
+
+	if (nbytes > 0 && nbytes < BLCKSZ)
+	{
+		ereport(LOG,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("could not read block %u in file \"%s\": read only %d of %d bytes",
+						blocknum, FilePathName(v->mdfd_vfd),
+						nbytes, BLCKSZ)));
+	}
+
+	return nbytes;
+}
+
 /*
  *	mdwrite() -- Write the supplied block at the appropriate location.
  *
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 244b4ea..f0e9a7b 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -51,6 +51,8 @@ typedef struct f_smgr
 											  BlockNumber blocknum);
 	void		(*smgr_read) (SMgrRelation reln, ForkNumber forknum,
 										  BlockNumber blocknum, char *buffer);
+	int			(*smgr_tryread) (SMgrRelation reln, ForkNumber forknum,
+										  BlockNumber blocknum, char *buffer);
 	void		(*smgr_write) (SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, char *buffer, bool skipFsync);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -66,7 +68,7 @@ typedef struct f_smgr
 static const f_smgr smgrsw[] = {
 	/* magnetic disk */
 	{mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
-		mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
+		mdprefetch, mdread, mdtryread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
 		mdpreckpt, mdsync, mdpostckpt
 	}
 };
@@ -626,6 +628,22 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	(*(smgrsw[reln->smgr_which].smgr_read)) (reln, forknum, blocknum, buffer);
 }
 
+
+/*
+ *	smgtryrread() -- read a particular block from a relation into the supplied
+ *				  buffer.
+ *
+ *		This routine is called from the buffer manager in order to
+ *		instantiate pages in the shared buffer cache.  All storage managers
+ *		return pages in the format that POSTGRES expects.
+ */
+int
+smgrtryread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		 char *buffer)
+{
+	return (*(smgrsw[reln->smgr_which].smgr_tryread)) (reln, forknum, blocknum, buffer);
+}
+
 /*
  *	smgrwrite() -- Write the supplied buffer out.
  *
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 521ee1c..5f961af 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -40,6 +40,7 @@
 #define BM_CHECKPOINT_NEEDED	(1 << 7)		/* must write for checkpoint */
 #define BM_PERMANENT			(1 << 8)		/* permanent relation (not
 												 * unlogged) */
+#define BM_NEW					(1 << 9)		/* Not guaranteed to exist on disk */
 
 typedef bits16 BufFlags;
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ec0a254..b52591f 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -153,6 +153,7 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
 						  ForkNumber forkNum, BlockNumber blockNum,
 						  ReadBufferMode mode, BufferAccessStrategy strategy);
+extern Buffer ExtendRelation(Relation reln, ForkNumber forkNum, BufferAccessStrategy strategy);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 69a624f..07a331c 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -94,6 +94,8 @@ extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum,
 			 BlockNumber blocknum);
 extern void smgrread(SMgrRelation reln, ForkNumber forknum,
 		 BlockNumber blocknum, char *buffer);
+extern int smgrtryread(SMgrRelation reln, ForkNumber forknum,
+		 BlockNumber blocknum, char *buffer);
 extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
 		  BlockNumber blocknum, char *buffer, bool skipFsync);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -114,12 +116,15 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
+extern void mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync);
+extern void mdappend(SMgrRelation reln, ForkNumber forknum,
 		 BlockNumber blocknum, char *buffer, bool skipFsync);
 extern void mdprefetch(SMgrRelation reln, ForkNumber forknum,
 		   BlockNumber blocknum);
 extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	   char *buffer);
+extern int mdtryread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+	   char *buffer);
 extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
 		BlockNumber blocknum, char *buffer, bool skipFsync);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
-- 
2.3.0.149.gf3f4077.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to