On 16.02.2018 11:59, Michail Nikolaev wrote:
Hello.

Just want to notice - this work also correlates with https://www.postgresql.org/message-id/CAEepm%3D18buPTwNWKZMrAXLqja1Tvezw6sgFJKPQ%2BsFFTuwM0bQ%40mail.gmail.com paper. It provides more general way to address the issue comparing to single optimisations (but they could do the work too, of course).


Certainly, contention-aware lock scheduler is much more general way of addressing this problem. But amount of efforts needed to implement such scheduler and integrate it in Postgres core is almost non-comparable.

I did some more tests with much simple benchmark than YCSB: it just execute specified percent of single record updates and selects for relatively small table. I used 50% of updates, 100 records table size and vary number of connections from 10 to 1000 with step 10. This benchmark (pgrw.c) and updated version of xlock patch are attached to this mail.

Results are present at the following spreadsheet:

https://docs.google.com/spreadsheets/d/1QOYfUehy8U3sdasMjGnPGQJY8JiRfZmlS64YRBM0YTo/edit?usp=sharing

I repeated tests several times. Dips on the chart corresponds to the auto-vacuum periods. For some reasons them are larger for xlock optimization. But, you can notice that xlock optimization significantly reduce degradation speed although doesn't completely eliminate this negative trend.

Thanks,
Michail.


чт, 15 февр. 2018 г. в 19:00, Konstantin Knizhnik <k.knizh...@postgrespro.ru <mailto:k.knizh...@postgrespro.ru>>:

    Hi,

    PostgreSQL performance degrades signficantly in case of high
    contention.
    You can look at the attached YCSB results (ycsb-zipf-pool.png) to
    estimate the level of this degradation.

    Postgres is acquiring two kind of heavy weight locks during update: it
    locks TID of the updated tuple and XID of transaction created this
    version.
    In debugger I see the following picture: if several transactions are
    trying to update the same record, then first of all they compete for
    SnapshotDirty.xmax transaction lock in EvalPlanQualFetch.
    Then in heap_tuple_update they are trying to lock TID of the updated
    tuple: heap_acquire_tuplock in heapam.c
    After that transactions wait completion of the transaction updated the
    tuple: XactLockTableWait in heapam.c

    So in heap_acquire_tuplock all competing transactions are waiting for
    TID of the updated version. When transaction which changed this
    tuple is
    committed, one of the competitors will grant this lock and proceed,
    creating new version of the tuple. Then all other competitors will be
    awaken and ... find out that locked tuple is not the last version
    any more.
    Then they will locate new version and try to lock it... The more
    competitors we have, then more attempts they all have to perform
    (quadratic complexity).

    My idea was that we can avoid such performance degradation if we
    somehow
    queue competing transactions so that they will get control one-by-one,
    without doing useless work.
    First of all I tried to replace TID  lock with PK (primary key) lock.
    Unlike TID, PK of record  is not changed during hot update. The second
    idea is that instead immediate releasing lock after update we can hold
    it until the end of transaction. And this optimization really provides
    improve of performance - it corresponds to pg_f1_opt configuration at
    the attached diagram.
    For some workloads it provides up to two times improvement comparing
    with vanilla Postgres. But there are many problems with correct
    (non-prototype) implementation of this approach:
    handling different types of PK, including compound keys, PK
    updates,...

    Another approach,  which I have recently implemented, is much simpler
    and address another lock kind: transaction lock.
    The idea o this approach is mostly the same: all competing transaction
    are waiting for transaction which is changing the tuple.
    Then one of them is given a chance to proceed and now all other
    transactions are waiting for this transaction and so on:

    T1<-T2,T3,T4,T5,T6,T7,T8,T9
    T2<-T3,T4,T5,T6,T7,T8,T9
    T3<-T4,T5,T6,T7,T8,T9
    ...

    <- here corresponds to "wait for" dependency between transactions.

    If we change this picture to

    T1<-T2, T2<-T3, T3<-T4, T4<-T5, T5<-T6, T6<-T7, T7<-T8, T8<-T9

    then number of lock requests can be significantly reduced.
    And it can be implemented using very simple patch (attached
    xlock.patch).
    I hope that I have not done something incorrect here.
    Effect of this simple patch is even larger:  more than 3 times for
    50..70 clients.
    Please look at the attached xlock.pdf spreadsheet.

    Unfortunately combination of this two approaches gives worser result
    than just single xlock.patch.
    Certainly this patch also requires further improvement, for example it
    will not correctly work in case of aborting transactions due to
    deadlock
    or some other reasons.
    I just want to know option of community if the proposed approaches to
    reduce contention are really promising and it makes sense to continue
    work in this direction.

    --
    Konstantin Knizhnik
    Postgres Professional: http://www.postgrespro.com
    The Russian Postgres Company


--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index dc3d8d9817..e9aaf27460 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -53,6 +53,8 @@
 /* This configuration variable is used to set the lock table size */
 int			max_locks_per_xact; /* set by guc.c */
 
+bool        xact_lock_chaining;
+
 #define NLOCKENTS() \
 	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
 
@@ -919,6 +921,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		else
 			return LOCKACQUIRE_NOT_AVAIL;
 	}
+
 	locallock->proclock = proclock;
 	lock = proclock->tag.myLock;
 	locallock->lock = lock;
@@ -949,7 +952,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		 * blocking, remove useless table entries and return NOT_AVAIL without
 		 * waiting.
 		 */
-		if (dontWait)
+		if (dontWait || (xact_lock_chaining
+				         && locktag->locktag_type == LOCKTAG_TRANSACTION
+						 && lockmode == ShareLock
+						 && TransactionIdIsValid(lock->lastWaitingXid)))
 		{
 			AbortStrongLockAcquire();
 			if (proclock->holdMask == 0)
@@ -973,12 +979,23 @@ LockAcquireExtended(const LOCKTAG *locktag,
 			LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
 			Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
 			Assert(lock->nGranted <= lock->nRequested);
+			if (!dontWait)
+			{
+				SET_LOCKTAG_TRANSACTION(*(LOCKTAG *)locktag, lock->lastWaitingXid);
+				lock->lastWaitingXid = GetCurrentTransactionIdIfAny();
+				LWLockRelease(partitionLock);
+				if (locallock->nLocks == 0)
+					RemoveLocalLock(locallock);
+				return LockAcquire(locktag,
+								   lockmode,
+								   sessionLock,
+								   dontWait);
+			}
 			LWLockRelease(partitionLock);
 			if (locallock->nLocks == 0)
 				RemoveLocalLock(locallock);
 			return LOCKACQUIRE_NOT_AVAIL;
 		}
-
 		/*
 		 * Set bitmask of locks this process already holds on this object.
 		 */
@@ -995,6 +1012,12 @@ LockAcquireExtended(const LOCKTAG *locktag,
 										 locktag->locktag_type,
 										 lockmode);
 
+		if (xact_lock_chaining
+			&& locktag->locktag_type == LOCKTAG_TRANSACTION
+			&& lockmode == ShareLock)
+		{
+			lock->lastWaitingXid = GetCurrentTransactionIdIfAny();
+		}
 		WaitOnLock(locallock, owner);
 
 		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
@@ -1098,6 +1121,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
 		LOCK_PRINT("LockAcquire: new", lock, lockmode);
+		lock->lastWaitingXid = InvalidTransactionId;
 	}
 	else
 	{
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 87ba67661a..f62909fda3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -804,7 +804,7 @@ static const unit_conversion time_unit_conversion_table[] =
 /******** option records follow ********/
 
 static struct config_bool ConfigureNamesBool[] =
-{
+{	
 	{
 		{"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of sequential-scan plans."),
@@ -1705,6 +1705,17 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"xact_lock_chaining", PGC_SIGHUP, LOCK_MANAGEMENT,
+			gettext_noop("Minimize lock contention by chaining xact locks"),
+			NULL
+		},
+		&xact_lock_chaining,
+		true,
+		NULL, NULL, NULL
+	},
+
+	
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 777da71679..51ad120180 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -35,6 +35,7 @@ typedef struct PROC_QUEUE
 
 /* GUC variables */
 extern int	max_locks_per_xact;
+extern bool xact_lock_chaining;
 
 #ifdef LOCK_DEBUG
 extern int	Trace_lock_oidmin;
@@ -295,6 +296,7 @@ typedef struct LOCK
 	int			nRequested;		/* total of requested[] array */
 	int			granted[MAX_LOCKMODES]; /* counts of granted locks */
 	int			nGranted;		/* total of granted[] array */
+	TransactionId lastWaitingXid;
 } LOCK;
 
 #define LOCK_LOCKMETHOD(lock) ((LOCKMETHODID) (lock).tag.locktag_lockmethodid)
#include <time.h>
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include <sys/time.h>
#include <pthread.h>
#include <libpq-fe.h>

#define MAX_THREADS 1024
#define MAX_ATTEMPTS 10
#define INT8OID 20

char const* connection = "dbname=postgres host=localhost port=5432 sslmode=disable connect_timeout=10";
int update_percent = 1;
int n_records = 100;
int n_clients = 100;
long selects[MAX_THREADS];
long updates[MAX_THREADS];
volatile int termination;

void* worker(void* arg)
{
        PGconn* con;
	PGresult *res;
	ConnStatusType status;
	Oid paramTypes[1] = { INT8OID };
	size_t id = (size_t)arg;
	int i;
	for (i = 0; i < MAX_ATTEMPTS; i++)
	{
	    con = PQconnectdb(connection);
	    status = PQstatus(con);
	    if (status == CONNECTION_OK)
	        break;
	    PQfinish(con);
        }
        if (status != CONNECTION_OK)
	{
		fprintf(stderr, "Could not establish connection to server %s, error = %s",
				connection, PQerrorMessage(con));
		exit(1);
	}
	PQprepare(con,
			  "update",
			  //"update t set v=v+1 from (select k from t where k=$1 for no key update) q where t.k=q.k",
			  "update t set v=v+1 where k=$1",
			  1,
			  paramTypes);
	PQprepare(con,
			  "select",
			  "select v from t where k=$1",
			  1,
			  paramTypes);

	while (!termination) {
		char key[64];
		char const* paramValues[] = {key};
		sprintf(key, "%d", rand() % n_records + 1);
		if (rand() % 100 < update_percent) {
			res = PQexecPrepared(con, "update", 1, paramValues, NULL, NULL, 0);
			if (PQresultStatus(res) != PGRES_COMMAND_OK) {
				fprintf(stderr, "Update failed: %s\n", PQresultErrorMessage(res));
				exit(1);
			}
			if (strcmp(PQcmdTuples(res), "1") != 0) {
				fprintf(stderr, "Update affect wrong number of tuples: %s\n", PQcmdTuples(res));
				exit(1);
			}
			updates[id] += 1;
		} else {
			res = PQexecPrepared(con, "select", 1, paramValues, NULL, NULL, 0);
			if (PQresultStatus(res) != PGRES_TUPLES_OK) {
				fprintf(stderr, "Select failed: %s\n", PQresultErrorMessage(res));
				exit(1);
			}
			if (PQntuples(res) != 1) {
				fprintf(stderr, "Select returns wrong number of tuples: %d\n", PQntuples(res));
				exit(1);
			}
			selects[id] += 1;
		}
		PQclear(res);
	}
	PQfinish(con);
	return 0;
}



int main (int argc, char* argv[])
{
	int i;
	pthread_t threads[MAX_THREADS];
	int test_duration = 10;
	long thread_updates[MAX_THREADS];
	long thread_selects[MAX_THREADS];
	time_t finish;
	int iteration = 0;
	int initialize = 0;
	long total_selects = 0;
	long total_updates = 0;
	int verbose = 0;
	if (argc == 1) {
        fprintf(stderr, "Use -h to show usage options\n");
        return 1;
    }

    for (i = 1; i < argc; i++) {
        if (argv[i][0] == '-') {
            switch (argv[i][1]) {
			  case 'n':
				n_records = atol(argv[++i]);
				continue;
			  case 'c':
				n_clients = atoi(argv[++i]);
				continue;
			  case 'u':
				update_percent = atoi(argv[++i]);
				continue;
			  case 't':
				test_duration = atoi(argv[++i]);
				continue;
	    case 'v':
	      verbose = 1;
	      continue;
	                  case 'd':
                connection = argv[++i];
                continue;
			  case 'i':
				initialize = 1;
				continue;
			}
        }
		printf("Options:\n"
			   "\t-i\tinitialize database\n"
			   "\t-n\tnumber of records (default 100)\n"
			   "\t-c\tnumber of client (default 100)\n"
			   "\t-u\tupdate percent (default 1%%)\n"
			   "\t-t\ttest duration (default 10 sec)\n"
			   "\t-d\tconnection string ('host=localhost port=5432')\n");
        return 1;
    }
	if (initialize) {
		PGconn* con = PQconnectdb(connection);
		char sql[256];
		sprintf(sql, "insert into t values (generate_series(1, %d), 0)", n_records);
		PQexec(con, "drop table t");
		PQexec(con, "create table t(k integer primary key, v integer)");
		PQexec(con, sql);
		PQfinish(con);
	}
	for (i = 0; i < n_clients; i++) {
		thread_updates[i] = 0;
		thread_selects[i] = 0;
		pthread_create(&threads[i], NULL, worker, (void*)(size_t)i);
	}
	finish = time(NULL) + test_duration;
	do {
		total_selects = 0;
		total_updates = 0;
		sleep(1);
		for (i = 0; i < n_clients; i++) {
			total_selects += selects[i] - thread_selects[i];
			thread_selects[i] = selects[i];
			total_updates += updates[i] - thread_updates[i];
			thread_updates[i] = updates[i];
		}
		if (verbose) 
		  printf("%d: %ld (%ld updates, %ld selects)\n", ++iteration, total_updates + total_selects, total_updates, total_selects);
	} while (time(NULL) < finish);

	total_selects = 0;
	total_updates = 0;
	for (i = 0; i < n_clients; i++) {
		total_selects += selects[i];
		total_updates += updates[i];
	}
	if (verbose)
	    printf("Summury: %ld (%ld updates, %ld selects)\n", (total_updates + total_selects)/test_duration, total_updates, total_selects);
	else 
	    printf("%ld\n", (total_updates + total_selects)/test_duration);

	termination = 1;

	for (i = 0; i < n_clients; i++) {
		pthread_join(threads[i], NULL);
	}
	return 0;
}

Reply via email to