On 2016-03-28 22:50:49 +0530, Amit Kapila wrote:
> On Fri, Sep 11, 2015 at 8:01 PM, Amit Kapila <amit.kapil...@gmail.com>
> wrote:
> >
> > On Thu, Sep 3, 2015 at 5:11 PM, Andres Freund <and...@anarazel.de> wrote:
> > >
> >
> > Updated comments and the patch (increate_clog_bufs_v2.patch)
> > containing the same is attached.
> >
> 
> Andres mentioned to me in off-list discussion, that he thinks we should
> first try to fix the clog buffers problem as he sees in his tests that clog
> buffer replacement is one of the bottlenecks. He also suggested me a test
> to see if the increase in buffers could lead to regression.  The basic idea
> of test was to ensure every access on clog access to be a disk one.  Based
> on his suggestion, I have written a SQL statement which will allow every
> access of CLOG to be a disk access and the query used for same is as below:
> With ins AS (INSERT INTO test_clog_access values(default) RETURNING c1)
> Select * from test_clog_access where c1 = (Select c1 from ins) - 32768 *
> :client_id;
> 
> Test Results
> ---------------------
> HEAD - commit d12e5bb7 Clog Buffers - 32
> Patch-1 - Clog Buffers - 64
> Patch-2 - Clog Buffers - 128
> 
> 
> Patch_Ver/Client_Count 1 64
> HEAD 12677 57470
> Patch-1 12305 58079
> Patch-2 12761 58637
> 
> Above data is a median of 3 10-min runs.  Above data indicates that there
> is no substantial dip in increasing clog buffers.
> 
> Test scripts used in testing are attached with this mail.  In
> perf_clog_access.sh, you need to change data_directory path as per your
> m/c, also you might want to change the binary name, if you want to create
> postgres binaries with different names.
> 
> Andres, Is this test inline with what you have in mind?

Yes. That looks good. My testing shows that increasing the number of
buffers can increase both throughput and reduce latency variance. The
former is a smaller effect with one of the discussed patches applied,
the latter seems to actually increase in scale (with increased
throughput).


I've attached patches to:
0001: Increase the max number of clog buffers
0002: Implement 64bit atomics fallback and optimize read/write
0003: Edited version of Simon's clog scalability patch

WRT 0003 - still clearly WIP - I've:
- made group_lsn pg_atomic_u64*, to allow for tear-free reads
- split content from IO lock
- made SimpleLruReadPage_optShared always return with only share lock
  held
- Implement a different, experimental, concurrency model for
  SetStatusBit using cmpxchg. A define USE_CONTENT_LOCK controls which
  bit is used.

I've tested this and saw this outperform Amit's approach. Especially so
when using a read/write mix, rather then only reads. I saw over 30%
increase on a large EC2 instance with -btpcb-like@1 -bselect-only@3. But
that's in a virtualized environment, not very good for reproducability.

Amit, could you run benchmarks on your bigger hardware? Both with
USE_CONTENT_LOCK commented out and in?

I think we should go for 1) and 2) unconditionally. And then evaluate
whether to go with your, or 3) from above. If the latter, we've to do
some cleanup :)

Greetings,

Andres Freund
>From 2f8828097337abb73effa098dab8bf61f2e7f3bb Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 30 Mar 2016 01:05:19 +0200
Subject: [PATCH 1/3] Improve 64bit atomics support.

When adding atomics back in b64d92f1a, I added 64bit support as
optional; there wasn't yet a direct user in sight.  That turned out to
be a bit short-sighted, it'd already have been useful a number of times.

Add a fallback implementation of 64bit atomics, just like the one we
have for 32bit atomics.

Additionally optimize reads/writes to 64bit on a number of platforms
where aligned writes of that size are atomic. This can now be tested
with PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY.
---
 src/backend/port/atomics.c           | 63 ++++++++++++++++++++++++++++++++++++
 src/include/port/atomics.h           | 13 +++-----
 src/include/port/atomics/arch-ia64.h |  3 ++
 src/include/port/atomics/arch-ppc.h  |  3 ++
 src/include/port/atomics/arch-x86.h  | 10 ++++++
 src/include/port/atomics/fallback.h  | 33 +++++++++++++++++++
 src/include/port/atomics/generic.h   | 24 +++++++++++---
 src/test/regress/regress.c           |  4 ---
 8 files changed, 137 insertions(+), 16 deletions(-)

diff --git a/src/backend/port/atomics.c b/src/backend/port/atomics.c
index 4972c30..8e53311 100644
--- a/src/backend/port/atomics.c
+++ b/src/backend/port/atomics.c
@@ -146,3 +146,66 @@ pg_atomic_fetch_add_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_)
 }
 
 #endif   /* PG_HAVE_ATOMIC_U32_SIMULATION */
+
+
+#ifdef PG_HAVE_ATOMIC_U64_SIMULATION
+
+void
+pg_atomic_init_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val_)
+{
+	StaticAssertStmt(sizeof(ptr->sema) >= sizeof(slock_t),
+					 "size mismatch of atomic_flag vs slock_t");
+
+	/*
+	 * If we're using semaphore based atomic flags, be careful about nested
+	 * usage of atomics while a spinlock is held.
+	 */
+#ifndef HAVE_SPINLOCKS
+	s_init_lock_sema((slock_t *) &ptr->sema, true);
+#else
+	SpinLockInit((slock_t *) &ptr->sema);
+#endif
+	ptr->value = val_;
+}
+
+bool
+pg_atomic_compare_exchange_u64_impl(volatile pg_atomic_uint64 *ptr,
+									uint64 *expected, uint64 newval)
+{
+	bool		ret;
+
+	/*
+	 * Do atomic op under a spinlock. It might look like we could just skip
+	 * the cmpxchg if the lock isn't available, but that'd just emulate a
+	 * 'weak' compare and swap. I.e. one that allows spurious failures. Since
+	 * several algorithms rely on a strong variant and that is efficiently
+	 * implementable on most major architectures let's emulate it here as
+	 * well.
+	 */
+	SpinLockAcquire((slock_t *) &ptr->sema);
+
+	/* perform compare/exchange logic */
+	ret = ptr->value == *expected;
+	*expected = ptr->value;
+	if (ret)
+		ptr->value = newval;
+
+	/* and release lock */
+	SpinLockRelease((slock_t *) &ptr->sema);
+
+	return ret;
+}
+
+uint64
+pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 *ptr, int64 add_)
+{
+	uint64		oldval;
+
+	SpinLockAcquire((slock_t *) &ptr->sema);
+	oldval = ptr->value;
+	ptr->value += add_;
+	SpinLockRelease((slock_t *) &ptr->sema);
+	return oldval;
+}
+
+#endif   /* PG_HAVE_ATOMIC_U64_SIMULATION */
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index f7884d7..894c3b3 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -12,13 +12,14 @@
  * * pg_compiler_barrier(), pg_write_barrier(), pg_read_barrier()
  * * pg_atomic_compare_exchange_u32(), pg_atomic_fetch_add_u32()
  * * pg_atomic_test_set_flag(), pg_atomic_init_flag(), pg_atomic_clear_flag()
+ * * PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY should be defined if appropriate.
  *
  * There exist generic, hardware independent, implementations for several
  * compilers which might be sufficient, although possibly not optimal, for a
  * new platform. If no such generic implementation is available spinlocks (or
  * even OS provided semaphores) will be used to implement the API.
  *
- * Implement the _u64 variants if and only if your platform can use them
+ * Implement _u64 atomics if and only if your platform can use them
  * efficiently (and obviously correctly).
  *
  * Use higher level functionality (lwlocks, spinlocks, heavyweight locks)
@@ -110,9 +111,9 @@
 
 /*
  * Provide a full fallback of the pg_*_barrier(), pg_atomic**_flag and
- * pg_atomic_*_u32 APIs for platforms without sufficient spinlock and/or
- * atomics support. In the case of spinlock backed atomics the emulation is
- * expected to be efficient, although less so than native atomics support.
+ * pg_atomic_* APIs for platforms without sufficient spinlock and/or atomics
+ * support. In the case of spinlock backed atomics the emulation is expected
+ * to be efficient, although less so than native atomics support.
  */
 #include "port/atomics/fallback.h"
 
@@ -400,8 +401,6 @@ pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
  * documentation.
  * ----
  */
-#ifdef PG_HAVE_ATOMIC_U64_SUPPORT
-
 static inline void
 pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
 {
@@ -485,8 +484,6 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
-#endif   /* PG_HAVE_64_BIT_ATOMICS */
-
 #undef INSIDE_ATOMICS_H
 
 #endif   /* ATOMICS_H */
diff --git a/src/include/port/atomics/arch-ia64.h b/src/include/port/atomics/arch-ia64.h
index 03cc8a0..b4ed9c4 100644
--- a/src/include/port/atomics/arch-ia64.h
+++ b/src/include/port/atomics/arch-ia64.h
@@ -24,3 +24,6 @@
 #elif defined(__hpux)
 #	define pg_memory_barrier_impl()		_Asm_mf()
 #endif
+
+/* per architecture manual doubleword accesses have single copy atomicity */
+#define PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY
diff --git a/src/include/port/atomics/arch-ppc.h b/src/include/port/atomics/arch-ppc.h
index 2b54c42..8635d82 100644
--- a/src/include/port/atomics/arch-ppc.h
+++ b/src/include/port/atomics/arch-ppc.h
@@ -24,3 +24,6 @@
 #define pg_read_barrier_impl()		__asm__ __volatile__ ("lwsync" : : : "memory")
 #define pg_write_barrier_impl()		__asm__ __volatile__ ("lwsync" : : : "memory")
 #endif
+
+/* per architecture manual doubleword accesses have single copy atomicity */
+#define PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY
diff --git a/src/include/port/atomics/arch-x86.h b/src/include/port/atomics/arch-x86.h
index 3dbabab..6792a9a 100644
--- a/src/include/port/atomics/arch-x86.h
+++ b/src/include/port/atomics/arch-x86.h
@@ -239,4 +239,14 @@ pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 *ptr, int64 add_)
 
 #endif /* defined(__GNUC__) || defined(__INTEL_COMPILER) */
 
+/*
+ * 8 byte reads / writes have single-copy atomicity on 32 bit x86 platforms
+ * since at least the 586. As well as on all x86-64 cpus.
+ */
+#if defined(__i568__) || defined(__i668__) || /* gcc i586+ */  \
+	(defined(_M_IX86) && _M_IX86 >= 500) || /* msvc i586+ */ \
+	defined(__x86_64__) || defined(__x86_64) || defined(_M_X64) /* gcc, sunpro, msvc */
+#define PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY
+#endif /* 8 byte single-copy atomicity */
+
 #endif /* HAVE_ATOMICS */
diff --git a/src/include/port/atomics/fallback.h b/src/include/port/atomics/fallback.h
index bdaa795..80bcf00 100644
--- a/src/include/port/atomics/fallback.h
+++ b/src/include/port/atomics/fallback.h
@@ -102,6 +102,24 @@ typedef struct pg_atomic_uint32
 
 #endif /* PG_HAVE_ATOMIC_U32_SUPPORT */
 
+#if !defined(PG_HAVE_ATOMIC_U64_SUPPORT)
+
+#define PG_HAVE_ATOMIC_U64_SIMULATION
+
+#define PG_HAVE_ATOMIC_U64_SUPPORT
+typedef struct pg_atomic_uint64
+{
+	/* Check pg_atomic_flag's definition above for an explanation */
+#if defined(__hppa) || defined(__hppa__)	/* HP PA-RISC, GCC and HP compilers */
+	int			sema[4];
+#else
+	int			sema;
+#endif
+	volatile uint64 value;
+} pg_atomic_uint64;
+
+#endif /* PG_HAVE_ATOMIC_U64_SUPPORT */
+
 #ifdef PG_HAVE_ATOMIC_FLAG_SIMULATION
 
 #define PG_HAVE_ATOMIC_INIT_FLAG
@@ -141,3 +159,18 @@ extern bool pg_atomic_compare_exchange_u32_impl(volatile pg_atomic_uint32 *ptr,
 extern uint32 pg_atomic_fetch_add_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_);
 
 #endif /* PG_HAVE_ATOMIC_U32_SIMULATION */
+
+
+#ifdef PG_HAVE_ATOMIC_U64_SIMULATION
+
+#define PG_HAVE_ATOMIC_INIT_U64
+extern void pg_atomic_init_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val_);
+
+#define PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U64
+extern bool pg_atomic_compare_exchange_u64_impl(volatile pg_atomic_uint64 *ptr,
+												uint64 *expected, uint64 newval);
+
+#define PG_HAVE_ATOMIC_FETCH_ADD_U64
+extern uint64 pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 *ptr, int64 add_);
+
+#endif /* PG_HAVE_ATOMIC_U64_SIMULATION */
diff --git a/src/include/port/atomics/generic.h b/src/include/port/atomics/generic.h
index 32a0113..a689c17 100644
--- a/src/include/port/atomics/generic.h
+++ b/src/include/port/atomics/generic.h
@@ -246,8 +246,6 @@ pg_atomic_sub_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 sub_)
 }
 #endif
 
-#ifdef PG_HAVE_ATOMIC_U64_SUPPORT
-
 #if !defined(PG_HAVE_ATOMIC_EXCHANGE_U64) && defined(PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U64)
 #define PG_HAVE_ATOMIC_EXCHANGE_U64
 static inline uint64
@@ -264,6 +262,26 @@ pg_atomic_exchange_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 xchg_)
 }
 #endif
 
+#if 0
+#ifndef PG_HAVE_ATOMIC_READ_U64
+#define PG_HAVE_ATOMIC_READ_U64
+static inline uint64
+pg_atomic_read_u64_impl(volatile pg_atomic_uint64 *ptr)
+{
+	return *(&ptr->value);
+}
+#endif
+
+#ifndef PG_HAVE_ATOMIC_WRITE_U64
+#define PG_HAVE_ATOMIC_WRITE_U64
+static inline void
+pg_atomic_write_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val)
+{
+	ptr->value = val;
+}
+#endif
+#endif
+
 #ifndef PG_HAVE_ATOMIC_WRITE_U64
 #define PG_HAVE_ATOMIC_WRITE_U64
 static inline void
@@ -379,5 +397,3 @@ pg_atomic_sub_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_fetch_sub_u64_impl(ptr, sub_) - sub_;
 }
 #endif
-
-#endif /* PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U64 */
diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c
index e7826a4..d20bbb7 100644
--- a/src/test/regress/regress.c
+++ b/src/test/regress/regress.c
@@ -1014,7 +1014,6 @@ test_atomic_uint32(void)
 		elog(ERROR, "pg_atomic_fetch_and_u32() #3 wrong");
 }
 
-#ifdef PG_HAVE_ATOMIC_U64_SUPPORT
 static void
 test_atomic_uint64(void)
 {
@@ -1090,7 +1089,6 @@ test_atomic_uint64(void)
 	if (pg_atomic_fetch_and_u64(&var, ~0) != 0)
 		elog(ERROR, "pg_atomic_fetch_and_u64() #3 wrong");
 }
-#endif   /* PG_HAVE_ATOMIC_U64_SUPPORT */
 
 
 PG_FUNCTION_INFO_V1(test_atomic_ops);
@@ -1113,9 +1111,7 @@ test_atomic_ops(PG_FUNCTION_ARGS)
 
 	test_atomic_uint32();
 
-#ifdef PG_HAVE_ATOMIC_U64_SUPPORT
 	test_atomic_uint64();
-#endif
 
 	PG_RETURN_BOOL(true);
 }
-- 
2.7.0.229.g701fa7f.dirty

>From 0004d57c0d2923b997cbc5e590f4ce7d7ab177ac Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 31 Mar 2016 00:59:12 +0200
Subject: [PATCH 2/3] Increase max number of buffers in clog SLRU to 128.

Discussion: CAA4eK1+8=X9mSNeVeHg_NqMsOR-XKsjuqrYzQf=icsdh3u4...@mail.gmail.com
---
 src/backend/access/transam/clog.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 06aff18..57a7913 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -440,7 +440,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
 Size
 CLOGShmemBuffers(void)
 {
-	return Min(32, Max(4, NBuffers / 512));
+	return Min(128, Max(4, NBuffers / 512));
 }
 
 /*
-- 
2.7.0.229.g701fa7f.dirty

>From 81d8ba39f4256cdfb6f3fbf0a5eec4f628f58d16 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 24 Mar 2016 23:48:30 +0100
Subject: [PATCH 3/3] Use a much more granular locking model for the clog SLRU.

Author: Simon Riggs, Andres Freund
Discussion: canp8+j+imqfhxkchfyfnxdyi6k-arazrv+zg-v_ofxetjjo...@mail.gmail.com
    CAA4eK1+8=X9mSNeVeHg_NqMsOR-XKsjuqrYzQf=icsdh3u4...@mail.gmail.com
---
 src/backend/access/transam/clog.c        |  87 ++++++++++++++++++---
 src/backend/access/transam/slru.c        | 130 ++++++++++++++++++++-----------
 src/backend/storage/lmgr/lwlocknames.txt |   1 +
 src/include/access/slru.h                |  17 ++--
 4 files changed, 177 insertions(+), 58 deletions(-)

diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 57a7913..a5ca20d 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -71,6 +71,12 @@
 #define GetLSNIndex(slotno, xid)	((slotno) * CLOG_LSNS_PER_PAGE + \
 	((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)
 
+/*
+ * (Un-)defining USE_CONTENT_LOCK selects between two locking models for clog
+ * pages. With USE_CONTENT_LOCK defined we use per-page content locks, without
+ * it every status modification is done using cmpxchg, without a content lock.
+ */
+//#define USE_CONTENT_LOCK
 
 /*
  * Link to shared-memory data structures for CLOG control
@@ -262,9 +268,16 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 		   status == TRANSACTION_STATUS_ABORTED ||
 		   (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));
 
-	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
-
 	/*
+	 * We acquire CLogCtl in share mode (via SimpleLruReadPage_optShared), to
+	 * prevent pages from being replaced.
+	 *
+	 * This is safe because all callers of these routines always check the xid
+	 * is complete first, either by checking TransactionIdIsInProgress() or by
+	 * waiting for the transaction to complete via the lock manager. As a
+	 * result, anybody checking visibility of our xids will never get as far
+	 * as checking the status here at the same time we are setting it.
+	 *
 	 * If we're doing an async commit (ie, lsn is valid), then we must wait
 	 * for any active write on the page slot to complete.  Otherwise our
 	 * update could reach disk in that write, which will not do since we
@@ -273,7 +286,18 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 	 * write-busy, since we don't care if the update reaches disk sooner than
 	 * we think.
 	 */
-	slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
+	slotno = SimpleLruReadPage_optShared(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
+
+	/*
+	 * We might have problems with concurrent writes, but we solve that by
+	 * acquiring an exclusive page lock to serialize commits and ensure there
+	 * is a memory barrier between commit writes. Note that we acquire and
+	 * release the lock for each page, to ensure that transactions with huge
+	 * numbers of subxacts don't hold up everyone else.
+	 */
+#ifdef USE_CONTENT_LOCK
+	LWLockAcquire(&ClogCtl->shared->buffer_content_locks[slotno].lock, LW_EXCLUSIVE);
+#endif
 
 	/*
 	 * Set the main transaction id, if any.
@@ -309,15 +333,24 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 		TransactionIdSetStatusBit(subxids[i], status, lsn, slotno);
 	}
 
+#ifndef USE_CONTENT_LOCK
+	/* prevent compiler from moving store up */
+	pg_compiler_barrier();
+#endif
+
 	ClogCtl->shared->page_dirty[slotno] = true;
 
+#ifdef USE_CONTENT_LOCK
+	LWLockRelease(&ClogCtl->shared->buffer_content_locks[slotno].lock);
+#endif
 	LWLockRelease(CLogControlLock);
 }
 
 /*
  * Sets the commit status of a single transaction.
  *
- * Must be called with CLogControlLock held
+ * Must be called with CLogControlLock held in (at least) share mode, and an
+ * exclusive lock on the page's content lock.
  */
 static void
 TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
@@ -350,12 +383,34 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
 			status != TRANSACTION_STATUS_IN_PROGRESS) ||
 		   curval == status);
 
-	/* note this assumes exclusive access to the clog page */
+#ifdef USE_CONTENT_LOCK
+	/* note this assumes exclusive write access to the clog page */
 	byteval = *byteptr;
+	pg_compiler_barrier(); /* prevent modification being done in-place */
 	byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift);
 	byteval |= (status << bshift);
+	pg_compiler_barrier(); /* prevent modification being done in-place */
 	*byteptr = byteval;
-
+#else
+	/*
+	 * Modify only the relevant bits, retry if byte has concurrently been
+	 * modified.
+	 *
+	 * XXX: Obviously this'd require some abstraction.
+	 */
+	while (true)
+	{
+		char origval;
+		origval = byteval = *((volatile char *) byteptr);
+		pg_compiler_barrier();
+		byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift);
+		byteval |= (status << bshift);
+		*byteptr = byteval;
+		if (__atomic_compare_exchange_n(byteptr, &origval, byteval, false,
+										__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
+			break;
+	}
+#endif
 	/*
 	 * Update the group LSN if the transaction completion LSN is higher.
 	 *
@@ -367,9 +422,23 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
 	if (!XLogRecPtrIsInvalid(lsn))
 	{
 		int			lsnindex = GetLSNIndex(slotno, xid);
+		pg_atomic_uint64 *group_lsn = &ClogCtl->shared->group_lsn[lsnindex];
 
-		if (ClogCtl->shared->group_lsn[lsnindex] < lsn)
-			ClogCtl->shared->group_lsn[lsnindex] = lsn;
+#ifdef USE_CONTENT_LOCK
+		/* we hold an exclusive write lock on the page */
+		if ((XLogRecPtr) pg_atomic_read_u64(group_lsn) < lsn)
+			pg_atomic_write_u64(group_lsn, (uint64) lsn);
+#else
+		uint64 old = pg_atomic_read_u64(group_lsn);
+
+		while (true)
+		{
+			if (old >= lsn)
+				break;
+			if (pg_atomic_compare_exchange_u64(group_lsn, &old, lsn))
+				break;
+		}
+#endif
 	}
 }
 
@@ -407,7 +476,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
 	status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
 
 	lsnindex = GetLSNIndex(slotno, xid);
-	*lsn = ClogCtl->shared->group_lsn[lsnindex];
+	*lsn = pg_atomic_read_u64(&ClogCtl->shared->group_lsn[lsnindex]);
 
 	LWLockRelease(CLogControlLock);
 
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 36a011c..52509dc 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -152,10 +152,11 @@ SimpleLruShmemSize(int nslots, int nlsns)
 	sz += MAXALIGN(nslots * sizeof(bool));		/* page_dirty[] */
 	sz += MAXALIGN(nslots * sizeof(int));		/* page_number[] */
 	sz += MAXALIGN(nslots * sizeof(int));		/* page_lru_count[] */
-	sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */
+	sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_io_locks[] */
+	sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_content_locks[] */
 
 	if (nlsns > 0)
-		sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));	/* group_lsn[] */
+		sz += MAXALIGN(nslots * nlsns * sizeof(pg_atomic_uint64));	/* group_lsn[] */
 
 	return BUFFERALIGN(sz) + BLCKSZ * nslots;
 }
@@ -206,25 +207,42 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
 
 		if (nlsns > 0)
 		{
-			shared->group_lsn = (XLogRecPtr *) (ptr + offset);
+			int i;
+
+			shared->group_lsn = (pg_atomic_uint64 *) (ptr + offset);
 			offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
+
+			for (i = 0; i < nslots * nlsns; i++)
+			{
+				pg_atomic_init_u64(&shared->group_lsn[i], 0);
+			}
 		}
 
 		/* Initialize LWLocks */
-		shared->buffer_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots);
+		shared->buffer_io_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots);
 
 		Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
-		strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH);
-		shared->lwlock_tranche_id = tranche_id;
-		shared->lwlock_tranche.name = shared->lwlock_tranche_name;
-		shared->lwlock_tranche.array_base = shared->buffer_locks;
-		shared->lwlock_tranche.array_stride = sizeof(LWLockPadded);
+		strlcpy(shared->lwlock_io_tranche_name, name, SLRU_MAX_NAME_LENGTH);
+		shared->lwlock_io_tranche_id = tranche_id;
+		shared->lwlock_io_tranche.name = shared->lwlock_io_tranche_name;
+		shared->lwlock_io_tranche.array_base = shared->buffer_io_locks;
+		shared->lwlock_io_tranche.array_stride = sizeof(LWLockPadded);
+
+		shared->buffer_content_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots);
+		Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
+		strlcpy(shared->lwlock_content_tranche_name, name, SLRU_MAX_NAME_LENGTH);
+		shared->lwlock_content_tranche_id = tranche_id;
+		shared->lwlock_content_tranche.name = shared->lwlock_content_tranche_name;
+		shared->lwlock_content_tranche.array_base = shared->buffer_content_locks;
+		shared->lwlock_content_tranche.array_stride = sizeof(LWLockPadded);
 
 		ptr += BUFFERALIGN(offset);
 		for (slotno = 0; slotno < nslots; slotno++)
 		{
-			LWLockInitialize(&shared->buffer_locks[slotno].lock,
-				shared->lwlock_tranche_id);
+			LWLockInitialize(&shared->buffer_io_locks[slotno].lock,
+				shared->lwlock_io_tranche_id);
+			LWLockInitialize(&shared->buffer_content_locks[slotno].lock,
+				shared->lwlock_content_tranche_id);
 
 			shared->page_buffer[slotno] = ptr;
 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
@@ -237,7 +255,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
 		Assert(found);
 
 	/* Register SLRU tranche in the main tranches array */
-	LWLockRegisterTranche(shared->lwlock_tranche_id, &shared->lwlock_tranche);
+	LWLockRegisterTranche(shared->lwlock_io_tranche_id, &shared->lwlock_io_tranche);
+	LWLockRegisterTranche(shared->lwlock_content_tranche_id, &shared->lwlock_content_tranche);
 
 	/*
 	 * Initialize the unshared control struct, including directory path. We
@@ -303,8 +322,16 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
 	SlruShared	shared = ctl->shared;
 
 	if (shared->lsn_groups_per_page > 0)
-		MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
-			   shared->lsn_groups_per_page * sizeof(XLogRecPtr));
+	{
+		int i;
+
+		for (i = slotno * shared->lsn_groups_per_page;
+			 i < (slotno + 1) * shared->lsn_groups_per_page;
+			 i++)
+		{
+			pg_atomic_init_u64(&shared->group_lsn[i], 0);
+		}
+	}
 }
 
 /*
@@ -321,8 +348,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
 
 	/* See notes at top of file */
 	LWLockRelease(shared->ControlLock);
-	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
-	LWLockRelease(&shared->buffer_locks[slotno].lock);
+	LWLockAcquire(&shared->buffer_io_locks[slotno].lock, LW_SHARED);
+	LWLockRelease(&shared->buffer_io_locks[slotno].lock);
 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
 
 	/*
@@ -336,7 +363,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
 	if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
 		shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
 	{
-		if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
+		if (LWLockConditionalAcquire(&shared->buffer_io_locks[slotno].lock, LW_SHARED))
 		{
 			/* indeed, the I/O must have failed */
 			if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
@@ -346,7 +373,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
 				shared->page_status[slotno] = SLRU_PAGE_VALID;
 				shared->page_dirty[slotno] = true;
 			}
-			LWLockRelease(&shared->buffer_locks[slotno].lock);
+			LWLockRelease(&shared->buffer_io_locks[slotno].lock);
 		}
 	}
 }
@@ -415,7 +442,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 		shared->page_dirty[slotno] = false;
 
 		/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
-		LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
+		LWLockAcquire(&shared->buffer_io_locks[slotno].lock, LW_EXCLUSIVE);
 
 		/* Release control lock while doing I/O */
 		LWLockRelease(shared->ControlLock);
@@ -435,7 +462,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 
 		shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
 
-		LWLockRelease(&shared->buffer_locks[slotno].lock);
+		LWLockRelease(&shared->buffer_io_locks[slotno].lock);
 
 		/* Now it's okay to ereport if we failed */
 		if (!ok)
@@ -457,36 +484,51 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
  * Return value is the shared-buffer slot number now holding the page.
  * The buffer's LRU access info is updated.
  *
- * Control lock must NOT be held at entry, but will be held at exit.
- * It is unspecified whether the lock will be shared or exclusive.
+ * Control lock must NOT be held at entry, but will be held in share mode at
+ * exit.
  */
 int
 SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
 {
+	return SimpleLruReadPage_optShared(ctl, pageno, true, xid);
+}
+
+int
+SimpleLruReadPage_optShared(SlruCtl ctl, int pageno, bool write_ok,
+						   TransactionId xid)
+{
 	SlruShared	shared = ctl->shared;
 	int			slotno;
 
-	/* Try to find the page while holding only shared lock */
-	LWLockAcquire(shared->ControlLock, LW_SHARED);
-
-	/* See if page is already in a buffer */
-	for (slotno = 0; slotno < shared->num_slots; slotno++)
+	while (true)
 	{
-		if (shared->page_number[slotno] == pageno &&
-			shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
-			shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
+		/* Try to find the page while holding only shared lock */
+		LWLockAcquire(shared->ControlLock, LW_SHARED);
+
+		/* See if page is already in a buffer */
+		for (slotno = 0; slotno < shared->num_slots; slotno++)
 		{
-			/* See comments for SlruRecentlyUsed macro */
-			SlruRecentlyUsed(shared, slotno);
-			return slotno;
+			if (shared->page_number[slotno] == pageno &&
+				shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
+				shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS &&
+				(write_ok ||
+				 shared->page_status[slotno] != SLRU_PAGE_WRITE_IN_PROGRESS))
+			{
+				/* See comments for SlruRecentlyUsed macro */
+				SlruRecentlyUsed(shared, slotno);
+				return slotno;
+			}
 		}
+
+		/* No luck, so switch to normal exclusive lock and do regular read */
+		LWLockRelease(shared->ControlLock);
+		LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+
+		SimpleLruReadPage(ctl, pageno, write_ok, xid);
+
+		/* release lock and try again, to avoid holding the lock exclusively */
+		LWLockRelease(shared->ControlLock);
 	}
-
-	/* No luck, so switch to normal exclusive lock and do regular read */
-	LWLockRelease(shared->ControlLock);
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
-
-	return SimpleLruReadPage(ctl, pageno, true, xid);
 }
 
 /*
@@ -531,7 +573,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
 	shared->page_dirty[slotno] = false;
 
 	/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
-	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
+	LWLockAcquire(&shared->buffer_io_locks[slotno].lock, LW_EXCLUSIVE);
 
 	/* Release control lock while doing I/O */
 	LWLockRelease(shared->ControlLock);
@@ -560,7 +602,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
 
 	shared->page_status[slotno] = SLRU_PAGE_VALID;
 
-	LWLockRelease(&shared->buffer_locks[slotno].lock);
+	LWLockRelease(&shared->buffer_io_locks[slotno].lock);
 
 	/* Now it's okay to ereport if we failed */
 	if (!ok)
@@ -738,11 +780,11 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
 					lsnoff;
 
 		lsnindex = slotno * shared->lsn_groups_per_page;
-		max_lsn = shared->group_lsn[lsnindex++];
+		max_lsn = (XLogRecPtr) pg_atomic_read_u64(&shared->group_lsn[lsnindex++]);
 		for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
 		{
-			XLogRecPtr	this_lsn = shared->group_lsn[lsnindex++];
-
+			XLogRecPtr	this_lsn = (XLogRecPtr)
+				pg_atomic_read_u64(&shared->group_lsn[lsnindex++]);
 			if (max_lsn < this_lsn)
 				max_lsn = this_lsn;
 		}
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index c557cb6..8d33644 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -46,3 +46,4 @@ CommitTsControlLock					38
 CommitTsLock						39
 ReplicationOriginLock				40
 MultiXactTruncationLock				41
+CLogModifyLock				42
diff --git a/src/include/access/slru.h b/src/include/access/slru.h
index 5fcebc5..fb8a325 100644
--- a/src/include/access/slru.h
+++ b/src/include/access/slru.h
@@ -14,6 +14,7 @@
 #define SLRU_H
 
 #include "access/xlogdefs.h"
+#include "port/atomics.h"
 #include "storage/lwlock.h"
 
 
@@ -81,7 +82,7 @@ typedef struct SlruSharedData
 	 * highest LSN known for a contiguous group of SLRU entries on that slot's
 	 * page.
 	 */
-	XLogRecPtr *group_lsn;
+	pg_atomic_uint64 *group_lsn;
 	int			lsn_groups_per_page;
 
 	/*----------
@@ -103,10 +104,14 @@ typedef struct SlruSharedData
 	int			latest_page_number;
 
 	/* LWLocks */
-	int			lwlock_tranche_id;
-	LWLockTranche lwlock_tranche;
-	char		lwlock_tranche_name[SLRU_MAX_NAME_LENGTH];
-	LWLockPadded *buffer_locks;
+	int			lwlock_io_tranche_id;
+	LWLockTranche lwlock_io_tranche;
+	char		lwlock_io_tranche_name[SLRU_MAX_NAME_LENGTH];
+	LWLockPadded *buffer_io_locks;
+	int			lwlock_content_tranche_id;
+	LWLockTranche lwlock_content_tranche;
+	char		lwlock_content_tranche_name[SLRU_MAX_NAME_LENGTH];
+	LWLockPadded *buffer_content_locks;
 } SlruSharedData;
 
 typedef SlruSharedData *SlruShared;
@@ -150,6 +155,8 @@ extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 				  TransactionId xid);
 extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno,
 						   TransactionId xid);
+extern int SimpleLruReadPage_optShared(SlruCtl ctl, int pageno,
+						   bool write_ok, TransactionId xid);
 extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
 extern void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied);
 extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage);
-- 
2.7.0.229.g701fa7f.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to