On Tue, Apr 19, 2016 at 6:11 PM, Kevin Grittner <kgri...@gmail.com> wrote:
> On Tue, Apr 19, 2016 at 9:57 AM, Amit Kapila <amit.kapil...@gmail.com> wrote:
>> On Sun, Apr 17, 2016 at 2:26 AM, Andres Freund <and...@anarazel.de> wrote:
>>> On 2016-04-16 16:44:52 -0400, Noah Misch wrote:
>>> > That is more controversial than the potential ~2% regression for
>>> > old_snapshot_threshold=-1.  Alvaro[2] and Robert[3] are okay releasing
>>> > that way, and Andres[4] is not.
>>> FWIW, I could be kinda convinced that it's temporarily ok, if there'd be
>>> a clear proposal on the table how to solve the scalability issue around
>>> MaintainOldSnapshotTimeMapping().
>> It seems that for read-only workloads, MaintainOldSnapshotTimeMapping()
>> takes EXCLUSIVE LWLock which seems to be a probable reason for a performance
>> regression.  Now, here the question is do we need to acquire that lock if
>> xmin is not changed since the last time value of
>> oldSnapshotControl->latest_xmin is updated or xmin is lesser than equal to
>> oldSnapshotControl->latest_xmin?
>> If we don't need it for above cases, I think it can address the performance
>> regression to a good degree for read-only workloads when the feature is
>> enabled.
> Thanks, Amit -- I think something along those lines is the right
> solution to the scaling issues when the feature is enabled.  For
> now I'm focusing on the back-patching issues and the performance
> regression when the feature is disabled, but I'll shift focus to
> this once the "killer" issues are in hand.

I had an idea I wanted to test out. The gist of it is to effectively
have the last slot of timestamp to xid map stored in the latest_xmin
field and only update the mapping when slot boundaries are crossed.
See attached WIP patch for details. This way the exclusive lock only
needs to be acquired once per minute. The common case is a spinlock
that could be replaced with atomics later. And it seems to me that the
mutex_threshold taken in TestForOldSnapshot() can also get pretty hot
under some workloads, so that may also need some tweaking.

I think a better approach would be to base the whole mechanism on a
periodically updated counter, instead of timestamps. Autovacuum
launcher looks like a good candidate to play the clock keeper, without
it the feature has little point anyway. AFAICS only the clock keeper
needs to have the timestamp xid mapping, others can make do with a
couple of periodically updated values. I haven't worked it out in
detail, but it feels like the code would be simpler. But this was a
larger change than I felt comfortable trying out, so I went with the
simple change first.

However, while checking out if my proof of concept patch actually
works I hit another issue. I couldn't get my test for the feature to
actually work. The test script I used is attached. Basically I have a
table with 1000 rows, one high throughput worker deleting old rows and
inserting new ones, one long query that acquires a snapshot and sleeps
for 30min, and one worker that has a repeatable read snapshot and
periodically does count(*) on the table. Based on documentation I
would expect the following:

* The interfering query gets cancelled
* The long running query gets to run
* Old rows will start to be cleaned up after the threshold expires.

However, testing on commit 9c75e1a36b6b2f3ad9f76ae661f42586c92c6f7c,
I'm seeing that the old rows do not get cleaned up, and that I'm only
seeing the interfering query get cancelled when old_snapshot_threshold
= 0. Larger values do not result in cancellation. Am I doing something
wrong or is the feature just not working at all?

Ants Aasma
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 8aa1f49..dc00d91 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -80,8 +80,11 @@ typedef struct OldSnapshotControlData
 	slock_t		mutex_current;			/* protect current timestamp */
 	int64		current_timestamp;		/* latest snapshot timestamp */
-	slock_t		mutex_latest_xmin;		/* protect latest snapshot xmin */
+	slock_t		mutex_latest_xmin;		/* protect latest snapshot xmin
+										 * and next_map_update
+										 */
 	TransactionId latest_xmin;			/* latest snapshot xmin */
+	int64		next_map_update;		/* latest snapshot valid up to */
 	slock_t		mutex_threshold;		/* protect threshold fields */
 	int64		threshold_timestamp;	/* earlier snapshot is old */
 	TransactionId threshold_xid;		/* earlier xid may be gone */
@@ -95,7 +98,9 @@ typedef struct OldSnapshotControlData
 	 * count_used value of old_snapshot_threshold means that the buffer is
 	 * full and the head must be advanced to add new entries.  Use timestamps
 	 * aligned to minute boundaries, since that seems less surprising than
-	 * aligning based on the first usage timestamp.
+	 * aligning based on the first usage timestamp. The latest bucket is
+	 * effectively stored within latest_xmin. The circular buffer is updated
+	 * when we get a new xmin value that doesn't fall into the same interval.
 	 * It is OK if the xid for a given time slot is from earlier than
 	 * calculated by adding the number of minutes corresponding to the
@@ -269,6 +274,7 @@ SnapMgrInit(void)
 		oldSnapshotControl->current_timestamp = 0;
 		oldSnapshotControl->latest_xmin = InvalidTransactionId;
+		oldSnapshotControl->next_map_update = 0;
 		oldSnapshotControl->threshold_timestamp = 0;
 		oldSnapshotControl->threshold_xid = InvalidTransactionId;
@@ -1594,9 +1600,15 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
 		int64		ts = GetSnapshotCurrentTimestamp();
 		TransactionId xlimit = recentXmin;
-		TransactionId latest_xmin = oldSnapshotControl->latest_xmin;
+		TransactionId latest_xmin;
+		int64		update_ts;
 		bool		same_ts_as_threshold = false;
+		SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+		latest_xmin = oldSnapshotControl->latest_xmin;
+		update_ts = oldSnapshotControl->next_map_update;
+		SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
 		 * Zero threshold always overrides to latest xmin, if valid.  Without
 		 * some heuristic it will find its own snapshot too old on, for
@@ -1631,6 +1643,14 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
 		if (!same_ts_as_threshold)
+			if (ts == update_ts)
+			{
+				xlimit = latest_xmin;
+				if (NormalTransactionIdFollows(xlimit, recentXmin))
+					SetOldSnapshotThresholdTimestamp(ts, xlimit);
+			}
+			else
+			{
 			LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
 			if (oldSnapshotControl->count_used > 0
@@ -1651,6 +1671,7 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+			}
@@ -1680,17 +1701,36 @@ void
 MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
 	int64		ts;
+	TransactionId latest_xmin;
+	int64		update_ts;
+	bool		map_update_required = false;
 	/* Fast exit when old_snapshot_threshold is not used. */
 	if (old_snapshot_threshold < 0)
-	/* Keep track of the latest xmin seen by any process. */
+	ts = AlignTimestampToMinuteBoundary(whenTaken);
+	/*
+	 * Keep track of the latest xmin seen by any process. Update mapping
+	 * with a new value when we have crossed a bucket boundary.
+	 */
-	if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin))
+	latest_xmin = oldSnapshotControl->latest_xmin;
+	update_ts = oldSnapshotControl->next_map_update;
+	if (ts > update_ts)
+	{
+		oldSnapshotControl->next_map_update = ts;
+		map_update_required = true;
+	}
+	if (TransactionIdFollows(xmin, latest_xmin))
 		oldSnapshotControl->latest_xmin = xmin;
+	/* We only needed to update the most recent xmin value. */
+	if (!map_update_required)
+		return;
 	/* No further tracking needed for 0 (used for testing). */
 	if (old_snapshot_threshold == 0)
@@ -1716,8 +1756,6 @@ MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
-	ts = AlignTimestampToMinuteBoundary(whenTaken);
 	LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
 	Assert(oldSnapshotControl->head_offset >= 0);
import psycopg2
import threading
from time import time, sleep
from datetime import datetime
import sys

TBL_SIZE = 1000
CONNSTRING = sys.argv[1] if len(sys.argv) > 1 else ""

def connect(name):
    conn = psycopg2.connect("%s application_name=%s" % (CONNSTRING, name))
    return conn

def init_high_tp(cur):
    cur.execute("DROP TABLE IF EXISTS high_throughput")    
    cur.execute("CREATE TABLE high_throughput (id int4 primary key, padding text)")
    cur.execute("INSERT INTO high_throughput SELECT x, repeat(' ', %d) FROM generate_series(1,%d) x" % (PADDING_SIZE, TBL_SIZE))

def show(msg):
    print datetime.now().strftime("[%H:%M:%S] "), msg

def high_tp_thread():
    conn = connect("write-workload")
    cur = conn.cursor()
    cur.execute("SHOW old_snapshot_threshold")
    row = cur.fetchone()
    show("old_snapshot_threshold = %s" % (row[0],))

    last_display = 0
    i = 1
    start = time()
    while True:
        cur_time = time()
        if cur_time - last_display > DISPLAY_INTERVAL:
            last_display = cur_time
            cur.execute("SELECT pg_table_size('high_throughput'), clock_timestamp() - last_autovacuum FROM pg_stat_user_tables WHERE relname = 'high_throughput'")
            row = cur.fetchone()
            show("High throughput table size @ %5ds. Size %6dkB Last vacuum %s ago" % (int(cur_time - start), row[0]/1024,row[1],))

        cur.execute("DELETE FROM high_throughput WHERE id = %s", (i,))
        cur.execute("INSERT INTO high_throughput VALUES (%s, REPEAT(' ',%s))", (i+TBL_SIZE, PADDING_SIZE))
        i += 1

def long_ss_thread(interval=1800):
    conn = connect("long-unrelated-query")
    cur = conn.cursor()
    while True:
        show("Starting %ds long query" % interval)
            cur.execute("SELECT NOW(), pg_sleep(%s)", (interval,))
        except psycopg2.Error, e:
            show("Long query canceled due to %s" % (e,))

def long_ss_error_thread():
    conn = connect("interfering-query")
    cur = conn.cursor()
    while True:
            while True:
                cur.execute("SELECT COUNT(*), MAX(id) FROM high_throughput")
                row = cur.fetchone()
                show("Counted %d rows with max %d in high_throughput table" % (row[0],row[1],))
        except psycopg2.Error, e:
            show("Interfering query got error %s" % (e,))
            show("Waiting 3min to restart interfering query")

threads = []
for parallel_func in [high_tp_thread, long_ss_thread, long_ss_error_thread]:
    t = threading.Thread(target=parallel_func)

for t in threads:

Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to