On 2015-09-14 17:41:42 +0200, Andres Freund wrote:
> I pointed out how you can actually make this safely lock-free giving you
> the interesting code.
And here's an actual implementation of that approach. It's definitely
work-in-progress and could easily be optimized further. Don't have any
big machines to play around with right now tho.
Andres
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index 3ae2848..3e70792 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -95,12 +95,9 @@ InitBufferPool(void)
BufferDesc *buf = GetBufferDescriptor(i);
CLEAR_BUFFERTAG(buf->tag);
- buf->flags = 0;
- buf->usage_count = 0;
- buf->refcount = 0;
- buf->wait_backend_pid = 0;
- SpinLockInit(&buf->buf_hdr_lock);
+ pg_atomic_init_u32(&buf->state, 0);
+ buf->wait_backend_pid = 0;
buf->buf_id = i;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 8c0358e..345322a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -51,6 +51,8 @@
#include "utils/resowner_private.h"
#include "utils/timestamp.h"
+#define likely(x) __builtin_expect((x),1)
+#define unlikely(x) __builtin_expect((x),0)
/* Note: these two macros only work on shared buffers, not local ones! */
#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
@@ -774,9 +776,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
if (isLocalBuf)
{
+ uint32 state;
+
+ state = pg_atomic_read_u32(&bufHdr->state);
/* Only need to adjust flags */
- Assert(bufHdr->flags & BM_VALID);
- bufHdr->flags &= ~BM_VALID;
+ Assert(state & BM_VALID);
+ state &= ~BM_VALID;
+ pg_atomic_write_u32(&bufHdr->state, state);
}
else
{
@@ -788,8 +794,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
do
{
LockBufHdr(bufHdr);
- Assert(bufHdr->flags & BM_VALID);
- bufHdr->flags &= ~BM_VALID;
+ Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID);
+ pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_VALID);
UnlockBufHdr(bufHdr);
} while (!StartBufferIO(bufHdr, true));
}
@@ -807,7 +813,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* it's not been recycled) but come right back here to try smgrextend
* again.
*/
- Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */
+ Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
@@ -885,7 +891,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (isLocalBuf)
{
/* Only need to adjust flags */
- bufHdr->flags |= BM_VALID;
+ pg_atomic_fetch_or_u32(&bufHdr->state, BM_VALID);
}
else
{
@@ -939,7 +945,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BufferTag oldTag; /* previous identity of selected buffer */
uint32 oldHash; /* hash value for oldTag */
LWLock *oldPartitionLock; /* buffer partition lock for it */
- BufFlags oldFlags;
+ uint32 oldFlags;
int buf_id;
volatile BufferDesc *buf;
bool valid;
@@ -1013,10 +1019,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
buf = StrategyGetBuffer(strategy);
- Assert(buf->refcount == 0);
+ Assert((pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) == 0);
/* Must copy buffer flags while we still hold the spinlock */
- oldFlags = buf->flags;
+ oldFlags = pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK;
/* Pin the buffer and then release the buffer spinlock */
PinBuffer_Locked(buf);
@@ -1210,8 +1216,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* recycle this buffer; we must undo everything we've done and start
* over with a new victim buffer.
*/
- oldFlags = buf->flags;
- if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
+ oldFlags = pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK;
+ if ((pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) == 1 &&
+ !(oldFlags & BM_DIRTY))
break;
UnlockBufHdr(buf);
@@ -1232,12 +1239,19 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* 1 so that the buffer can survive one clock-sweep pass.)
*/
buf->tag = newTag;
- buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
+ pg_atomic_fetch_and_u32(&buf->state,
+ ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
+ BM_CHECKPOINT_NEEDED | BM_IO_ERROR |
+ BM_PERMANENT |
+ BUF_USAGECOUNT_MASK));
if (relpersistence == RELPERSISTENCE_PERMANENT)
- buf->flags |= BM_TAG_VALID | BM_PERMANENT;
+ pg_atomic_fetch_or_u32(&buf->state,
+ BM_TAG_VALID | BM_PERMANENT |
+ BUF_USAGECOUNT_ONE);
else
- buf->flags |= BM_TAG_VALID;
- buf->usage_count = 1;
+ pg_atomic_fetch_or_u32(&buf->state,
+ BM_TAG_VALID |
+ BUF_USAGECOUNT_ONE);
UnlockBufHdr(buf);
@@ -1286,7 +1300,7 @@ InvalidateBuffer(volatile BufferDesc *buf)
BufferTag oldTag;
uint32 oldHash; /* hash value for oldTag */
LWLock *oldPartitionLock; /* buffer partition lock for it */
- BufFlags oldFlags;
+ uint32 oldFlags;
/* Save the original buffer tag before dropping the spinlock */
oldTag = buf->tag;
@@ -1329,7 +1343,7 @@ retry:
* yet done StartBufferIO, WaitIO will fall through and we'll effectively
* be busy-looping here.)
*/
- if (buf->refcount != 0)
+ if ((pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) != 0)
{
UnlockBufHdr(buf);
LWLockRelease(oldPartitionLock);
@@ -1344,10 +1358,9 @@ retry:
* Clear out the buffer's tag and flags. We must do this to ensure that
* linear scans of the buffer array don't think the buffer is valid.
*/
- oldFlags = buf->flags;
+ oldFlags = pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK;
CLEAR_BUFFERTAG(buf->tag);
- buf->flags = 0;
- buf->usage_count = 0;
+ pg_atomic_fetch_and_u32(&buf->state, BM_LOCKED | ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK));
UnlockBufHdr(buf);
@@ -1399,12 +1412,12 @@ MarkBufferDirty(Buffer buffer)
LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
+ Assert((pg_atomic_read_u32(&bufHdr->state) & BUF_REFCOUNT_MASK) > 0);
/*
* If the buffer was not dirty already, do vacuum accounting.
*/
- if (!(bufHdr->flags & BM_DIRTY))
+ if (!(pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY))
{
VacuumPageDirty++;
pgBufferUsage.shared_blks_dirtied++;
@@ -1412,7 +1425,8 @@ MarkBufferDirty(Buffer buffer)
VacuumCostBalance += VacuumCostPageDirty;
}
- bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+ pg_atomic_fetch_or_u32(&bufHdr->state,
+ BM_DIRTY | BM_JUST_DIRTIED);
UnlockBufHdr(bufHdr);
}
@@ -1495,23 +1509,39 @@ PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
if (ref == NULL)
{
+ uint32 state;
+ uint32 oldstate;
+
ReservePrivateRefCountEntry();
ref = NewPrivateRefCountEntry(b);
- LockBufHdr(buf);
- buf->refcount++;
- if (strategy == NULL)
- {
- if (buf->usage_count < BM_MAX_USAGE_COUNT)
- buf->usage_count++;
- }
- else
+ state = pg_atomic_read_u32(&buf->state);
+ oldstate = state;
+
+ while (true)
{
- if (buf->usage_count == 0)
- buf->usage_count = 1;
+ /* spin-wait till lock is free */
+ while (unlikely(state & BM_LOCKED))
+ {
+ pg_spin_delay();
+ state = pg_atomic_read_u32(&buf->state);
+ }
+
+ /* increase refcount */
+ state += 1;
+
+ /* increase usagecount unless already max */
+ if (((state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) != BM_MAX_USAGE_COUNT)
+ state += BUF_USAGECOUNT_ONE;
+
+ result = (state & BM_VALID) != 0;
+
+ if (likely(pg_atomic_compare_exchange_u32(&buf->state, &oldstate, state)))
+ break;
+
+ /* get ready for next loop, oldstate has been updated by cas */
+ state = oldstate;
}
- result = (buf->flags & BM_VALID) != 0;
- UnlockBufHdr(buf);
}
else
{
@@ -1558,7 +1588,7 @@ PinBuffer_Locked(volatile BufferDesc *buf)
*/
Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
- buf->refcount++;
+ pg_atomic_fetch_add_u32(&buf->state, 1);
UnlockBufHdr(buf);
b = BufferDescriptorGetBuffer(buf);
@@ -1594,30 +1624,41 @@ UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
ref->refcount--;
if (ref->refcount == 0)
{
+ uint32 state;
+
/* I'd better not still hold any locks on the buffer */
Assert(!LWLockHeldByMe(buf->content_lock));
Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
- LockBufHdr(buf);
-
- /* Decrement the shared reference count */
- Assert(buf->refcount > 0);
- buf->refcount--;
+ /*
+ * Decrement the shared reference count.
+ *
+ * Arguably it'd be more robust if we checked for BM_LOCKED here, but
+ * currently all manipulation of ->state for shared buffers is through
+ * atomics.
+ */
+ state = pg_atomic_fetch_sub_u32(&buf->state, 1);
+ Assert((state & BUF_REFCOUNT_MASK) > 0);
/* Support LockBufferForCleanup() */
- if ((buf->flags & BM_PIN_COUNT_WAITER) &&
- buf->refcount == 1)
+ if (state & BM_PIN_COUNT_WAITER)
{
- /* we just released the last pin other than the waiter's */
- int wait_backend_pid = buf->wait_backend_pid;
+ LockBufHdr(buf);
- buf->flags &= ~BM_PIN_COUNT_WAITER;
- UnlockBufHdr(buf);
- ProcSendSignal(wait_backend_pid);
- }
- else
- UnlockBufHdr(buf);
+ if (pg_atomic_read_u32(&buf->state) & BM_PIN_COUNT_WAITER &&
+ (pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) == 1)
+ {
+ /* we just released the last pin other than the waiter's */
+ int wait_backend_pid = buf->wait_backend_pid;
+ pg_atomic_fetch_and_u32(&buf->state,
+ ~BM_PIN_COUNT_WAITER);
+ UnlockBufHdr(buf);
+ ProcSendSignal(wait_backend_pid);
+ }
+ else
+ UnlockBufHdr(buf);
+ }
ForgetPrivateRefCountEntry(ref);
}
}
@@ -1680,9 +1721,10 @@ BufferSync(int flags)
*/
LockBufHdr(bufHdr);
- if ((bufHdr->flags & mask) == mask)
+ if ((pg_atomic_read_u32(&bufHdr->state) & mask) == mask)
{
- bufHdr->flags |= BM_CHECKPOINT_NEEDED;
+ pg_atomic_fetch_or_u32(&bufHdr->state,
+ BM_CHECKPOINT_NEEDED);
num_to_write++;
}
@@ -1721,7 +1763,7 @@ BufferSync(int flags)
* write the buffer though we didn't need to. It doesn't seem worth
* guarding against this, though.
*/
- if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
+ if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
{
if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
{
@@ -2081,6 +2123,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
{
volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0;
+ uint32 state;
ReservePrivateRefCountEntry();
@@ -2095,7 +2138,10 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
*/
LockBufHdr(bufHdr);
- if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
+ state = pg_atomic_read_u32(&bufHdr->state);
+
+ if ((state & BUF_REFCOUNT_MASK) == 0 &&
+ (state & BUF_USAGECOUNT_MASK) == 0)
result |= BUF_REUSABLE;
else if (skip_recently_used)
{
@@ -2104,7 +2150,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
return result;
}
- if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
+ if (!(state & BM_VALID) || !(state & BM_DIRTY))
{
/* It's clean, so nothing to do */
UnlockBufHdr(bufHdr);
@@ -2256,6 +2302,7 @@ PrintBufferLeakWarning(Buffer buffer)
int32 loccount;
char *path;
BackendId backend;
+ uint32 state;
Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer))
@@ -2273,12 +2320,13 @@ PrintBufferLeakWarning(Buffer buffer)
/* theoretically we should lock the bufhdr here */
path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
+ state = pg_atomic_read_u32(&buf->state);
elog(WARNING,
"buffer refcount leak: [%03d] "
"(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
buffer, path,
- buf->tag.blockNum, buf->flags,
- buf->refcount, loccount);
+ buf->tag.blockNum, state & BUF_FLAG_MASK,
+ state & BUF_REFCOUNT_MASK, loccount);
pfree(path);
}
@@ -2424,7 +2472,7 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
recptr = BufferGetLSN(buf);
/* To check if block content changes while flushing. - vadim 01/17/97 */
- buf->flags &= ~BM_JUST_DIRTIED;
+ pg_atomic_fetch_and_u32(&buf->state, ~BM_JUST_DIRTIED);
UnlockBufHdr(buf);
/*
@@ -2444,7 +2492,7 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
* disastrous system-wide consequences. To make sure that can't happen,
* skip the flush if the buffer isn't permanent.
*/
- if (buf->flags & BM_PERMANENT)
+ if (pg_atomic_read_u32(&buf->state) & BM_PERMANENT)
XLogFlush(recptr);
/*
@@ -2538,7 +2586,7 @@ BufferIsPermanent(Buffer buffer)
* old value or the new value, but not random garbage.
*/
bufHdr = GetBufferDescriptor(buffer - 1);
- return (bufHdr->flags & BM_PERMANENT) != 0;
+ return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
}
/*
@@ -2874,7 +2922,8 @@ FlushRelationBuffers(Relation rel)
{
bufHdr = GetLocalBufferDescriptor(i);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
- (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+ (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY))
+ == (BM_VALID | BM_DIRTY))
{
ErrorContextCallback errcallback;
Page localpage;
@@ -2895,7 +2944,7 @@ FlushRelationBuffers(Relation rel)
localpage,
false);
- bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+ pg_atomic_fetch_and_u32(&bufHdr->state, ~(BM_DIRTY | BM_JUST_DIRTIED));
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -2923,7 +2972,8 @@ FlushRelationBuffers(Relation rel)
LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
- (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+ (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY))
+ == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
LWLockAcquire(bufHdr->content_lock, LW_SHARED);
@@ -2975,7 +3025,8 @@ FlushDatabaseBuffers(Oid dbid)
LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid &&
- (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+ (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY))
+ == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
LWLockAcquire(bufHdr->content_lock, LW_SHARED);
@@ -3093,12 +3144,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
* is only intended to be used in cases where failing to write out the
* data would be harmless anyway, it doesn't really matter.
*/
- if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+ if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED))
{
XLogRecPtr lsn = InvalidXLogRecPtr;
bool dirtied = false;
bool delayChkpt = false;
+ uint32 state;
/*
* If we need to protect hint bit updates from torn writes, WAL-log a
@@ -3109,7 +3161,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
* We don't check full_page_writes here because that logic is included
* when we call XLogInsert() since the value changes dynamically.
*/
- if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
+ if (XLogHintBitIsNeeded() && (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
{
/*
* If we're in recovery we cannot dirty a page because of a hint.
@@ -3149,8 +3201,12 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
}
LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
- if (!(bufHdr->flags & BM_DIRTY))
+
+ state = pg_atomic_read_u32(&bufHdr->state);
+
+ Assert((state & BUF_REFCOUNT_MASK) > 0);
+
+ if (!(state & BM_DIRTY))
{
dirtied = true; /* Means "will be dirtied by this action" */
@@ -3170,7 +3226,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
if (!XLogRecPtrIsInvalid(lsn))
PageSetLSN(page, lsn);
}
- bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+ pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY | BM_JUST_DIRTIED);
+
UnlockBufHdr(bufHdr);
if (delayChkpt)
@@ -3208,9 +3266,9 @@ UnlockBuffers(void)
* Don't complain if flag bit not set; it could have been reset but we
* got a cancel/die interrupt before getting the signal.
*/
- if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
+ if ((pg_atomic_read_u32(&buf->state) & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_pid == MyProcPid)
- buf->flags &= ~BM_PIN_COUNT_WAITER;
+ pg_atomic_fetch_and_u32(&buf->state, ~BM_PIN_COUNT_WAITER);
UnlockBufHdr(buf);
@@ -3304,25 +3362,30 @@ LockBufferForCleanup(Buffer buffer)
for (;;)
{
+ int state;
+
/* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
- if (bufHdr->refcount == 1)
+
+ state = pg_atomic_read_u32(&bufHdr->state);
+
+ Assert((state & BUF_REFCOUNT_MASK) > 0);
+ if ((state & BUF_REFCOUNT_MASK) == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
UnlockBufHdr(bufHdr);
return;
}
/* Failed, so mark myself as waiting for pincount 1 */
- if (bufHdr->flags & BM_PIN_COUNT_WAITER)
+ if (state & BM_PIN_COUNT_WAITER)
{
UnlockBufHdr(bufHdr);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "multiple backends attempting to wait for pincount 1");
}
bufHdr->wait_backend_pid = MyProcPid;
- bufHdr->flags |= BM_PIN_COUNT_WAITER;
+ pg_atomic_fetch_or_u32(&bufHdr->state, BM_PIN_COUNT_WAITER);
PinCountWaitBuf = bufHdr;
UnlockBufHdr(bufHdr);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@@ -3349,9 +3412,9 @@ LockBufferForCleanup(Buffer buffer)
* better be safe.
*/
LockBufHdr(bufHdr);
- if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 &&
+ if ((pg_atomic_read_u32(&bufHdr->state) & BM_PIN_COUNT_WAITER) != 0 &&
bufHdr->wait_backend_pid == MyProcPid)
- bufHdr->flags &= ~BM_PIN_COUNT_WAITER;
+ pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_PIN_COUNT_WAITER);
UnlockBufHdr(bufHdr);
PinCountWaitBuf = NULL;
@@ -3393,22 +3456,25 @@ bool
ConditionalLockBufferForCleanup(Buffer buffer)
{
volatile BufferDesc *bufHdr;
+ uint32 refcount;
Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer))
{
+ refcount = LocalRefCount[-buffer - 1];
/* There should be exactly one pin */
- Assert(LocalRefCount[-buffer - 1] > 0);
- if (LocalRefCount[-buffer - 1] != 1)
+ Assert(refcount > 0);
+ if (refcount != 1)
return false;
/* Nobody else to wait for */
return true;
}
/* There should be exactly one local pin */
- Assert(GetPrivateRefCount(buffer) > 0);
- if (GetPrivateRefCount(buffer) != 1)
+ refcount = GetPrivateRefCount(buffer);
+ Assert(refcount);
+ if (refcount != 1)
return false;
/* Try to acquire lock */
@@ -3417,8 +3483,10 @@ ConditionalLockBufferForCleanup(Buffer buffer)
bufHdr = GetBufferDescriptor(buffer - 1);
LockBufHdr(bufHdr);
- Assert(bufHdr->refcount > 0);
- if (bufHdr->refcount == 1)
+
+ refcount = pg_atomic_read_u32(&bufHdr->state) & BUF_REFCOUNT_MASK;
+ Assert(refcount > 0);
+ if (refcount == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
UnlockBufHdr(bufHdr);
@@ -3456,7 +3524,7 @@ WaitIO(volatile BufferDesc *buf)
*/
for (;;)
{
- BufFlags sv_flags;
+ uint32 state;
/*
* It may not be necessary to acquire the spinlock to check the flag
@@ -3464,9 +3532,10 @@ WaitIO(volatile BufferDesc *buf)
* play it safe.
*/
LockBufHdr(buf);
- sv_flags = buf->flags;
+ state = pg_atomic_read_u32(&buf->state);
UnlockBufHdr(buf);
- if (!(sv_flags & BM_IO_IN_PROGRESS))
+
+ if (!(state & BM_IO_IN_PROGRESS))
break;
LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
LWLockRelease(buf->io_in_progress_lock);
@@ -3494,6 +3563,8 @@ WaitIO(volatile BufferDesc *buf)
static bool
StartBufferIO(volatile BufferDesc *buf, bool forInput)
{
+ uint32 state;
+
Assert(!InProgressBuf);
for (;;)
@@ -3506,7 +3577,9 @@ StartBufferIO(volatile BufferDesc *buf, bool forInput)
LockBufHdr(buf);
- if (!(buf->flags & BM_IO_IN_PROGRESS))
+ state = pg_atomic_read_u32(&buf->state);
+
+ if (!(state & BM_IO_IN_PROGRESS))
break;
/*
@@ -3522,7 +3595,7 @@ StartBufferIO(volatile BufferDesc *buf, bool forInput)
/* Once we get here, there is definitely no I/O active on this buffer */
- if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
+ if (forInput ? (state & BM_VALID) : !(state & BM_DIRTY))
{
/* someone else already did the I/O */
UnlockBufHdr(buf);
@@ -3530,7 +3603,7 @@ StartBufferIO(volatile BufferDesc *buf, bool forInput)
return false;
}
- buf->flags |= BM_IO_IN_PROGRESS;
+ pg_atomic_fetch_or_u32(&buf->state, BM_IO_IN_PROGRESS);
UnlockBufHdr(buf);
@@ -3565,11 +3638,13 @@ TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
LockBufHdr(buf);
- Assert(buf->flags & BM_IO_IN_PROGRESS);
- buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
- if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
- buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
- buf->flags |= set_flag_bits;
+ Assert(pg_atomic_read_u32(&buf->state) & BM_IO_IN_PROGRESS);
+
+ pg_atomic_fetch_and_u32(&buf->state, ~(BM_IO_IN_PROGRESS | BM_IO_ERROR));
+ if (clear_dirty && !(pg_atomic_read_u32(&buf->state) & BM_JUST_DIRTIED))
+ pg_atomic_fetch_and_u32(&buf->state, ~(BM_DIRTY | BM_CHECKPOINT_NEEDED));
+
+ pg_atomic_fetch_or_u32(&buf->state, set_flag_bits);
UnlockBufHdr(buf);
@@ -3603,23 +3678,24 @@ AbortBufferIO(void)
LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
LockBufHdr(buf);
- Assert(buf->flags & BM_IO_IN_PROGRESS);
+ Assert(pg_atomic_read_u32(&buf->state) & BM_IO_IN_PROGRESS);
if (IsForInput)
{
- Assert(!(buf->flags & BM_DIRTY));
+ Assert(!(pg_atomic_read_u32(&buf->state) & BM_DIRTY));
+
/* We'd better not think buffer is valid yet */
- Assert(!(buf->flags & BM_VALID));
+ Assert(!(pg_atomic_read_u32(&buf->state) & BM_VALID));
UnlockBufHdr(buf);
}
else
{
- BufFlags sv_flags;
+ uint32 state;
- sv_flags = buf->flags;
- Assert(sv_flags & BM_DIRTY);
+ state = pg_atomic_read_u32(&buf->state);
+ Assert(state & BM_DIRTY);
UnlockBufHdr(buf);
/* Issue notice if this is not the first failure... */
- if (sv_flags & BM_IO_ERROR)
+ if (state & BM_IO_ERROR)
{
/* Buffer is pinned, so we can read tag without spinlock */
char *path;
@@ -3701,3 +3777,33 @@ rnode_comparator(const void *p1, const void *p2)
else
return 0;
}
+
+void
+LockBufHdr(volatile BufferDesc *desc)
+{
+ uint32 state = pg_atomic_read_u32(&desc->state);
+
+ for (;;)
+ {
+ /* wait till lock is free */
+ while (unlikely(state & BM_LOCKED))
+ {
+ pg_spin_delay();
+ state = pg_atomic_read_u32(&desc->state);
+
+ /* Add exponential backoff? Should seldomly be contended tho. */
+ }
+
+ /* and try to get lock */
+ if (pg_atomic_compare_exchange_u32(&desc->state, &state, state | BM_LOCKED))
+ break;
+ }
+}
+
+void
+UnlockBufHdr(volatile BufferDesc *desc)
+{
+ Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED);
+
+ pg_atomic_fetch_sub_u32(&desc->state, BM_LOCKED);
+}
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index bc2c773..3f2227b 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -250,6 +250,8 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
{
while (true)
{
+ uint32 state;
+
/* Acquire the spinlock to remove element from the freelist */
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
@@ -280,7 +282,9 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
* of 8.3, but we'd better check anyway.)
*/
LockBufHdr(buf);
- if (buf->refcount == 0 && buf->usage_count == 0)
+ state = pg_atomic_read_u32(&buf->state);
+ if ((state & BUF_REFCOUNT_MASK) == 0
+ && (state & BUF_USAGECOUNT_MASK) == 0)
{
if (strategy != NULL)
AddBufferToRing(strategy, buf);
@@ -295,6 +299,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
trycounter = NBuffers;
for (;;)
{
+ uint32 state;
buf = GetBufferDescriptor(ClockSweepTick());
@@ -303,11 +308,15 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
* it; decrement the usage_count (unless pinned) and keep scanning.
*/
LockBufHdr(buf);
- if (buf->refcount == 0)
+
+ state = pg_atomic_read_u32(&buf->state);
+
+ if ((state & BUF_REFCOUNT_MASK) == 0)
{
- if (buf->usage_count > 0)
+ if ((state & BUF_USAGECOUNT_MASK) != 0)
{
- buf->usage_count--;
+ pg_atomic_fetch_sub_u32(&buf->state, BUF_USAGECOUNT_ONE);
+
trycounter = NBuffers;
}
else
@@ -589,6 +598,8 @@ GetBufferFromRing(BufferAccessStrategy strategy)
{
volatile BufferDesc *buf;
Buffer bufnum;
+ uint32 state;
+ uint32 usagecount;
/* Advance to next ring slot */
if (++strategy->current >= strategy->ring_size)
@@ -617,7 +628,10 @@ GetBufferFromRing(BufferAccessStrategy strategy)
*/
buf = GetBufferDescriptor(bufnum - 1);
LockBufHdr(buf);
- if (buf->refcount == 0 && buf->usage_count <= 1)
+ state = pg_atomic_read_u32(&buf->state);
+ usagecount = (state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT;
+ if ((state & BUF_REFCOUNT_MASK) == 0
+ && usagecount <= 1)
{
strategy->current_was_in_ring = true;
return buf;
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 3144afe..1e11d71 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
int b;
int trycounter;
bool found;
+ uint32 state;
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
@@ -128,16 +129,25 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1);
#endif
+ state = pg_atomic_read_u32(&bufHdr->state);
+
/* this part is equivalent to PinBuffer for a shared buffer */
if (LocalRefCount[b] == 0)
{
- if (bufHdr->usage_count < BM_MAX_USAGE_COUNT)
- bufHdr->usage_count++;
+ int usagecount;
+
+ usagecount = (state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT;
+
+ if (usagecount < BM_MAX_USAGE_COUNT)
+ {
+ state += BUF_USAGECOUNT_ONE;
+ pg_atomic_write_u32(&bufHdr->state, state);
+ }
}
LocalRefCount[b]++;
ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(bufHdr));
- if (bufHdr->flags & BM_VALID)
+ if (state & BM_VALID)
*foundPtr = TRUE;
else
{
@@ -169,9 +179,15 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
if (LocalRefCount[b] == 0)
{
- if (bufHdr->usage_count > 0)
+ int usagecount;
+
+ state = pg_atomic_read_u32(&bufHdr->state);
+ usagecount = (state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT;
+
+ if (usagecount > 0)
{
- bufHdr->usage_count--;
+ state -= BUF_USAGECOUNT_ONE;
+ pg_atomic_write_u32(&bufHdr->state, state);
trycounter = NLocBuffer;
}
else
@@ -193,7 +209,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
* this buffer is not referenced but it might still be dirty. if that's
* the case, write it out before reusing it!
*/
- if (bufHdr->flags & BM_DIRTY)
+ if (state & BM_DIRTY)
{
SMgrRelation oreln;
Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
@@ -211,7 +227,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
false);
/* Mark not-dirty now in case we error out below */
- bufHdr->flags &= ~BM_DIRTY;
+ state &= ~BM_DIRTY;
+ pg_atomic_write_u32(&bufHdr->state, state);
pgBufferUsage.local_blks_written++;
}
@@ -228,7 +245,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
/*
* Update the hash table: remove old entry, if any, and make new one.
*/
- if (bufHdr->flags & BM_TAG_VALID)
+ if (state & BM_TAG_VALID)
{
hresult = (LocalBufferLookupEnt *)
hash_search(LocalBufHash, (void *) &bufHdr->tag,
@@ -237,7 +254,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
elog(ERROR, "local buffer hash table corrupted");
/* mark buffer invalid just in case hash insert fails */
CLEAR_BUFFERTAG(bufHdr->tag);
- bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID);
+ state &= ~(BM_VALID | BM_TAG_VALID);
+ pg_atomic_write_u32(&bufHdr->state, state);
}
hresult = (LocalBufferLookupEnt *)
@@ -250,9 +268,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
* it's all ours now.
*/
bufHdr->tag = newTag;
- bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
- bufHdr->flags |= BM_TAG_VALID;
- bufHdr->usage_count = 1;
+ state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
+ state |= BM_TAG_VALID;
+ state &= ~BUF_USAGECOUNT_MASK;
+ state += BUF_USAGECOUNT_ONE;
+ pg_atomic_write_u32(&bufHdr->state, state);
*foundPtr = FALSE;
return bufHdr;
@@ -267,6 +287,7 @@ MarkLocalBufferDirty(Buffer buffer)
{
int bufid;
BufferDesc *bufHdr;
+ uint32 state;
Assert(BufferIsLocal(buffer));
@@ -280,10 +301,13 @@ MarkLocalBufferDirty(Buffer buffer)
bufHdr = GetLocalBufferDescriptor(bufid);
- if (!(bufHdr->flags & BM_DIRTY))
+ state = pg_atomic_read_u32(&bufHdr->state);
+
+ if (!(state & BM_DIRTY))
pgBufferUsage.local_blks_dirtied++;
- bufHdr->flags |= BM_DIRTY;
+ state |= BM_DIRTY;
+ pg_atomic_write_u32(&bufHdr->state, state);
}
/*
@@ -307,8 +331,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
{
BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
LocalBufferLookupEnt *hresult;
+ uint32 state;
- if ((bufHdr->flags & BM_TAG_VALID) &&
+ state = pg_atomic_read_u32(&bufHdr->state);
+
+ if ((state & BM_TAG_VALID) &&
RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
bufHdr->tag.forkNum == forkNum &&
bufHdr->tag.blockNum >= firstDelBlock)
@@ -327,8 +354,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
elog(ERROR, "local buffer hash table corrupted");
/* Mark buffer invalid */
CLEAR_BUFFERTAG(bufHdr->tag);
- bufHdr->flags = 0;
- bufHdr->usage_count = 0;
+ state &= ~BUF_FLAG_MASK;
+ state &= ~BUF_USAGECOUNT_MASK;
+ pg_atomic_write_u32(&bufHdr->state, state);
}
}
}
@@ -349,8 +377,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
{
BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
LocalBufferLookupEnt *hresult;
+ uint32 state;
+
+ state = pg_atomic_read_u32(&bufHdr->state);
- if ((bufHdr->flags & BM_TAG_VALID) &&
+ if ((state & BM_TAG_VALID) &&
RelFileNodeEquals(bufHdr->tag.rnode, rnode))
{
if (LocalRefCount[i] != 0)
@@ -367,8 +398,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
elog(ERROR, "local buffer hash table corrupted");
/* Mark buffer invalid */
CLEAR_BUFFERTAG(bufHdr->tag);
- bufHdr->flags = 0;
- bufHdr->usage_count = 0;
+ state &= ~BUF_FLAG_MASK;
+ state &= ~BUF_USAGECOUNT_MASK;
+ pg_atomic_write_u32(&bufHdr->state, state);
}
}
}
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 521ee1c..92889e6 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -20,29 +20,40 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
+#include "port/atomics.h"
#include "storage/spin.h"
#include "utils/relcache.h"
/*
+ * State is:
+ * 10 bit flags
+ * 4 bit usage count
+ * 18 bit refcount
+ */
+#define BUF_REFCOUNT_MASK ((1U << 18) - 1)
+#define BUF_FLAG_MASK 0xFFC00000U
+#define BUF_USAGECOUNT_MASK 0x003C0000U
+#define BUF_USAGECOUNT_ONE (1U << 18)
+#define BUF_USAGECOUNT_SHIFT 18
+
+/*
* Flags for buffer descriptors
*
* Note: TAG_VALID essentially means that there is a buffer hashtable
* entry associated with the buffer's tag.
*/
-#define BM_DIRTY (1 << 0) /* data needs writing */
-#define BM_VALID (1 << 1) /* data is valid */
-#define BM_TAG_VALID (1 << 2) /* tag is assigned */
-#define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */
-#define BM_IO_ERROR (1 << 4) /* previous I/O failed */
-#define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */
-#define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */
-#define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */
-#define BM_PERMANENT (1 << 8) /* permanent relation (not
+#define BM_LOCKED (1U << 22) /* buffer header is locked */
+#define BM_DIRTY (1U << 23) /* data needs writing */
+#define BM_VALID (1U << 24) /* data is valid */
+#define BM_TAG_VALID (1U << 25) /* tag is assigned */
+#define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */
+#define BM_IO_ERROR (1U << 27) /* previous I/O failed */
+#define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */
+#define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */
+#define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */
+#define BM_PERMANENT (1U << 31) /* permanent relation (not
* unlogged) */
-
-typedef bits16 BufFlags;
-
/*
* The maximum allowed value of usage_count represents a tradeoff between
* accuracy and speed of the clock-sweep buffer management algorithm. A
@@ -137,12 +148,11 @@ typedef struct buftag
typedef struct BufferDesc
{
BufferTag tag; /* ID of page contained in buffer */
- BufFlags flags; /* see bit definitions above */
- uint16 usage_count; /* usage counter for clock sweep code */
- unsigned refcount; /* # of backends holding pins on buffer */
- int wait_backend_pid; /* backend PID of pin-count waiter */
- slock_t buf_hdr_lock; /* protects the above fields */
+ /* state of the tag, containing flags, refcount and usagecount */
+ pg_atomic_uint32 state;
+
+ int wait_backend_pid; /* backend PID of pin-count waiter */
int buf_id; /* buffer's index number (from 0) */
int freeNext; /* link in freelist chain */
@@ -192,16 +202,11 @@ typedef union BufferDescPadded
#define FREENEXT_NOT_IN_LIST (-2)
/*
- * Macros for acquiring/releasing a shared buffer header's spinlock.
- * Do not apply these to local buffers!
- *
- * Note: as a general coding rule, if you are using these then you probably
- * need to be using a volatile-qualified pointer to the buffer header, to
- * ensure that the compiler doesn't rearrange accesses to the header to
- * occur before or after the spinlock is acquired/released.
+ * Functions for acquiring/releasing a shared buffer header's spinlock. Do
+ * not apply these to local buffers! FIXUP!
*/
-#define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock)
-#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock)
+extern void LockBufHdr(volatile BufferDesc *desc);
+extern void UnlockBufHdr(volatile BufferDesc *desc);
/* in buf_init.c */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers