Hello everyone!
There's the second version of my patch for pgbench. Now transactions
with serialization and deadlock failures are rolled back and retried
until they end successfully or their number of attempts reaches maximum.
In details:
- You can set the maximum number of attempts by the appropriate
benchmarking option (--max-attempts-number). Its default value is 1
partly because retrying of shell commands can produce new errors.
- Statistics of attempts and failures is printed in progress, in
transaction / aggregation logs and in the end with other results (all
and for each script). The transaction failure is reported here only if
the last retry of this transaction fails.
- Also failures and average numbers of transactions attempts are printed
per-command with average latencies if you use the appropriate
benchmarking option (--report-per-command, -r) (it replaces the option
--report-latencies as I was advised here [1]). Average numbers of
transactions attempts are printed only for commands which start
transactions.
As usual: TAP tests for new functionality and changed documentation with
new examples.
Patch is attached. Any suggestions are welcome!
[1]
https://www.postgresql.org/message-id/alpine.DEB.2.20.1707031321370.3419%40lancre
--
Marina Polyakova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
From 58f51cdc896af801bcd35e495406655ca03aa6ce Mon Sep 17 00:00:00 2001
From: Marina Polyakova <m.polyak...@postgrespro.ru>
Date: Mon, 10 Jul 2017 13:33:41 +0300
Subject: [PATCH v2] Pgbench Retry transactions with serialization or deadlock
errors
Now transactions with serialization or deadlock failures can be rolled back and
retried again and again until they end successfully or their number of attempts
reaches maximum. You can set the maximum number of attempts by the appropriate
benchmarking option (--max-attempts-number). Its default value is 1. Statistics
of attempts and failures is printed in progress, in transaction / aggregation
logs and in the end with other results (all and for each script). The
transaction failure is reported here only if the last retry of this transaction
fails. Also failures and average numbers of transactions attempts are printed
per-command with average latencies if you use the appropriate benchmarking
option (--report-per-command, -r). Average numbers of transactions attempts are
printed only for commands which start transactions.
---
doc/src/sgml/ref/pgbench.sgml | 277 ++++++--
src/bin/pgbench/pgbench.c | 751 ++++++++++++++++++---
src/bin/pgbench/t/002_serialization_errors.pl | 121 ++++
src/bin/pgbench/t/003_deadlock_errors.pl | 130 ++++
src/bin/pgbench/t/004_retry_failed_transactions.pl | 280 ++++++++
5 files changed, 1421 insertions(+), 138 deletions(-)
create mode 100644 src/bin/pgbench/t/002_serialization_errors.pl
create mode 100644 src/bin/pgbench/t/003_deadlock_errors.pl
create mode 100644 src/bin/pgbench/t/004_retry_failed_transactions.pl
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 64b043b..dc1daa9 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -49,22 +49,34 @@
<screen>
transaction type: <builtin: TPC-B (sort of)>
+transaction maximum attempts number: 1
scaling factor: 10
query mode: simple
number of clients: 10
number of threads: 1
number of transactions per client: 1000
number of transactions actually processed: 10000/10000
+number of transactions with serialization failures: 0 (0.000 %)
+number of transactions with deadlock failures: 0 (0.000 %)
+attempts number average = 1.00
+attempts number stddev = 0.00
tps = 85.184871 (including connections establishing)
tps = 85.296346 (excluding connections establishing)
</screen>
- The first six lines report some of the most important parameter
+ The first seven lines report some of the most important parameter
settings. The next line reports the number of transactions completed
and intended (the latter being just the product of number of clients
and number of transactions per client); these will be equal unless the run
failed before completion. (In <option>-T</> mode, only the actual
number of transactions is printed.)
+ The next four lines report the number of transactions with serialization and
+ deadlock failures, and also the statistics of transactions attempts. With
+ such errors, transactions are rolled back and are repeated again and again
+ until they end sucessufully or their number of attempts reaches maximum (to
+ change this maximum see the appropriate benchmarking option
+ <option>--max-attempts-number</>). The transaction failure is reported here
+ only if the last retry of this transaction fails.
The last two lines report the number of transactions per second,
figured with and without counting the time to start database sessions.
</para>
@@ -434,24 +446,28 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
<listitem>
<para>
Show progress report every <replaceable>sec</> seconds. The report
- includes the time since the beginning of the run, the tps since the
- last report, and the transaction latency average and standard
- deviation since the last report. Under throttling (<option>-R</>),
- the latency is computed with respect to the transaction scheduled
- start time, not the actual transaction beginning time, thus it also
- includes the average schedule lag time.
+ includes the time since the beginning of the run and the following
+ statistics since the last report: the tps, the transaction latency
+ average and standard deviation, the number of transactions with
+ serialization and deadlock failures, and the average number of
+ transactions attempts and its standard deviation. Under throttling
+ (<option>-R</>), the latency is computed with respect to the transaction
+ scheduled start time, not the actual transaction beginning time, thus it
+ also includes the average schedule lag time.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-r</option></term>
- <term><option>--report-latencies</option></term>
+ <term><option>--report-per-command</option></term>
<listitem>
<para>
- Report the average per-statement latency (execution time from the
- perspective of the client) of each command after the benchmark
- finishes. See below for details.
+ Report the following statistics for each command after the benchmark
+ finishes: the average per-statement latency (execution time from the
+ perspective of the client), the number of serialization and deadlock
+ failures, and the average number of transactions attempts (only for
+ commands that start transactions). See below for details.
</para>
</listitem>
</varlistentry>
@@ -496,6 +512,15 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
</para>
<para>
+ Transactions with serialization or deadlock failures (or with both
+ of them if used script contains several transactions; see
+ <xref linkend="transactions-and-scripts"
+ endterm="transactions-and-scripts-title"> for more information) are
+ marked separately and their time is not reported as for skipped
+ transactions.
+ </para>
+
+ <para>
A high schedule lag time is an indication that the system cannot
process transactions at the specified rate, with the chosen number of
clients and threads. When the average transaction execution time is
@@ -590,6 +615,23 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
</varlistentry>
<varlistentry>
+ <term><option>--max-attempts-number=<replaceable>attempts_number</></option></term>
+ <listitem>
+ <para>
+ Set the maximum attempts number for transactions. Default is 1.
+ </para>
+ <note>
+ <para>
+ Be careful if you want to repeat transactions with shell commands
+ inside. Unlike sql commands the result of shell command is not rolled
+ back except for its variable value. If a shell command fails its
+ client is aborted without restarting.
+ </para>
+ </note>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--progress-timestamp</option></term>
<listitem>
<para>
@@ -693,8 +735,8 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
<refsect1>
<title>Notes</title>
- <refsect2>
- <title>What is the <quote>Transaction</> Actually Performed in <application>pgbench</application>?</title>
+ <refsect2 id="transactions-and-scripts">
+ <title id="transactions-and-scripts-title">What is the <quote>Transaction</> Actually Performed in <application>pgbench</application>?</title>
<para>
<application>pgbench</> executes test scripts chosen randomly
@@ -1148,7 +1190,7 @@ END;
The format of the log is:
<synopsis>
-<replaceable>client_id</> <replaceable>transaction_no</> <replaceable>time</> <replaceable>script_no</> <replaceable>time_epoch</> <replaceable>time_us</> <optional> <replaceable>schedule_lag</replaceable> </optional>
+<replaceable>client_id</> <replaceable>transaction_no</> <replaceable>time</> <replaceable>script_no</> <replaceable>time_epoch</> <replaceable>time_us</> <replaceable>average_attempts_number</> <optional> <replaceable>schedule_lag</replaceable> </optional>
</synopsis>
where
@@ -1158,39 +1200,53 @@ END;
<replaceable>time</> is the total elapsed transaction time in microseconds,
<replaceable>script_no</> identifies which script file was used (useful when
multiple scripts were specified with <option>-f</> or <option>-b</>),
- and <replaceable>time_epoch</>/<replaceable>time_us</> are a
+ <replaceable>time_epoch</>/<replaceable>time_us</> are a
Unix-epoch time stamp and an offset
in microseconds (suitable for creating an ISO 8601
time stamp with fractional seconds) showing when
- the transaction completed.
+ the transaction completed,
+ and <replaceable>average_attempts_number</> is the average number of
+ transactions attempts during the current script execution.
The <replaceable>schedule_lag</> field is the difference between the
transaction's scheduled start time, and the time it actually started, in
microseconds. It is only present when the <option>--rate</> option is used.
When both <option>--rate</> and <option>--latency-limit</> are used,
the <replaceable>time</> for a skipped transaction will be reported as
<literal>skipped</>.
+ If a transaction has serialization and/or deadlock failures, its
+ <replaceable>time</> will be reported as <literal>serialization failure</>,
+ <literal>deadlock failure</>, or
+ <literal>serialization and deadlock failures</>, respectively.
</para>
+ <note>
+ <para>
+ Transactions can have both serialization and deadlock failures if the
+ used script contained several transactions. See
+ <xref linkend="transactions-and-scripts"
+ endterm="transactions-and-scripts-title"> for more information.
+ </para>
+ </note>
<para>
Here is a snippet of a log file generated in a single-client run:
<screen>
-0 199 2241 0 1175850568 995598
-0 200 2465 0 1175850568 998079
-0 201 2513 0 1175850569 608
-0 202 2038 0 1175850569 2663
+0 199 2241 0 1175850568 995598 1
+0 200 2465 0 1175850568 998079 1
+0 201 2513 0 1175850569 608 1
+0 202 2038 0 1175850569 2663 1
</screen>
Another example with <literal>--rate=100</>
and <literal>--latency-limit=5</> (note the additional
<replaceable>schedule_lag</> column):
<screen>
-0 81 4621 0 1412881037 912698 3005
-0 82 6173 0 1412881037 914578 4304
-0 83 skipped 0 1412881037 914578 5217
-0 83 skipped 0 1412881037 914578 5099
-0 83 4722 0 1412881037 916203 3108
-0 84 4142 0 1412881037 918023 2333
-0 85 2465 0 1412881037 919759 740
+0 81 4621 0 1412881037 912698 1 3005
+0 82 6173 0 1412881037 914578 1 4304
+0 83 skipped 0 1412881037 914578 1 5217
+0 83 skipped 0 1412881037 914578 1 5099
+0 83 4722 0 1412881037 916203 1 3108
+0 84 4142 0 1412881037 918023 1 2333
+0 85 2465 0 1412881037 919759 1 740
</screen>
In this example, transaction 82 was late, because its latency (6.173 ms) was
over the 5 ms limit. The next two transactions were skipped, because they
@@ -1198,6 +1254,22 @@ END;
</para>
<para>
+ Example with serialization failures (the maximum number of attempts is 10):
+<screen>
+3 0 47423 0 1499414498 34501 4
+3 1 8333 0 1499414498 42848 1
+3 2 8358 0 1499414498 51219 1
+4 0 72345 0 1499414498 59433 7
+1 3 41718 0 1499414498 67879 5
+1 4 8416 0 1499414498 76311 1
+3 3 33235 0 1499414498 84469 4
+0 0 serialization_failure 0 1499414498 84905 10
+2 0 serialization_failure 0 1499414498 86248 10
+3 4 8307 0 1499414498 92788 1
+</screen>
+ </para>
+
+ <para>
When running a long test on hardware that can handle a lot of transactions,
the log files can become very large. The <option>--sampling-rate</> option
can be used to log only a random sample of transactions.
@@ -1212,7 +1284,7 @@ END;
format is used for the log files:
<synopsis>
-<replaceable>interval_start</> <replaceable>num_transactions</> <replaceable>sum_latency</> <replaceable>sum_latency_2</> <replaceable>min_latency</> <replaceable>max_latency</> <optional> <replaceable>sum_lag</> <replaceable>sum_lag_2</> <replaceable>min_lag</> <replaceable>max_lag</> <optional> <replaceable>skipped</> </optional> </optional>
+<replaceable>interval_start</> <replaceable>num_transactions</> <replaceable>sum_latency</> <replaceable>sum_latency_2</> <replaceable>min_latency</> <replaceable>max_latency</> <replaceable>num_serialization_failures_transactions</> <replaceable>num_deadlock_failures_transactions</> <replaceable>attempts_count</> <replaceable>attempts_sum</> <replaceable>attempts_sum2</> <replaceable>attempts_min</> <replaceable>attempts_max</> <optional> <replaceable>sum_lag</> <replaceable>sum_lag_2</> <replaceable>min_lag</> <replaceable>max_lag</> <optional> <replaceable>skipped</> </optional> </optional>
</synopsis>
where
@@ -1226,7 +1298,14 @@ END;
transaction latencies within the interval,
<replaceable>min_latency</> is the minimum latency within the interval,
and
- <replaceable>max_latency</> is the maximum latency within the interval.
+ <replaceable>max_latency</> is the maximum latency within the interval,
+ <replaceable>num_serialization_failures_transactions</> and
+ <replaceable>num_deadlock_failures_transactions</> - the numbers of
+ transactions with the corresponding failures within the interval,
+ <replaceable>attempts_count</>, <replaceable>attempts_sum</>,
+ <replaceable>attempts_sum2</>, <replaceable>attempts_min</> and
+ <replaceable>attempts_max</> - the statistics of transactions attempts within
+ the interval.
The next fields,
<replaceable>sum_lag</>, <replaceable>sum_lag_2</>, <replaceable>min_lag</>,
and <replaceable>max_lag</>, are only present if the <option>--rate</>
@@ -1241,14 +1320,23 @@ END;
Each transaction is counted in the interval when it was committed.
</para>
+ <note>
+ <para>
+ The number of transactions attempts within the interval can be greater than
+ the number of transactions within this interval multiplied by the maximum
+ attempts number. See <xref linkend="transactions-and-scripts"
+ endterm="transactions-and-scripts-title"> for more information.
+ </para>
+ </note>
+
<para>
Here is some example output:
<screen>
-1345828501 5601 1542744 483552416 61 2573
-1345828503 7884 1979812 565806736 60 1479
-1345828505 7208 1979422 567277552 59 1391
-1345828507 7685 1980268 569784714 60 1398
-1345828509 7073 1979779 573489941 236 1411
+1345828501 5601 1542744 483552416 61 2573 0 0 5601 5601 5601 1 1
+1345828503 7884 1979812 565806736 60 1479 0 0 7884 7884 7884 1 1
+1345828505 7208 1979422 567277552 59 1391 0 0 7208 7208 7884 1 1
+1345828507 7685 1980268 569784714 60 1398 0 0 7685 7685 7685 1 1
+1345828509 7073 1979779 573489941 236 1411 0 0 7073 7073 7073 1 1
</screen></para>
<para>
@@ -1260,13 +1348,41 @@ END;
</refsect2>
<refsect2>
- <title>Per-Statement Latencies</title>
+ <title>Per-Statement Report</title>
<para>
- With the <option>-r</> option, <application>pgbench</> collects
- the elapsed transaction time of each statement executed by every
- client. It then reports an average of those values, referred to
- as the latency for each statement, after the benchmark has finished.
+ With the <option>-r</> option, <application>pgbench</> collects the following
+ statistics for each statement:
+ <itemizedlist>
+ <listitem>
+ <para>
+ the elapsed transaction time of each statement; <application>pgbench</>
+ reports an average of those values, referred to as the latency for each
+ statement;
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ the number of serialization and deadlock failures;
+ </para>
+ <note>
+ <para>The total sum of per-command failures of each type can be greater
+ than the number of transactions with reported failures.
+ See <xref linkend="transactions-and-scripts"
+ endterm="transactions-and-scripts-title"> for more information.
+ </para>
+ </note>
+ </listitem>
+ <listitem>
+ <para>
+ the average number of transaction attempts for command which start this
+ transaction;
+ </para>
+ </listitem>
+ </itemizedlist>
+
+ All values are computed for each statement executed by every client and are
+ reported after the benchmark has finished.
</para>
<para>
@@ -1274,35 +1390,90 @@ END;
<screen>
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
+transaction maximum attempts number: 1
scaling factor: 1
query mode: simple
number of clients: 10
number of threads: 1
number of transactions per client: 1000
number of transactions actually processed: 10000/10000
+number of transactions with serialization failures: 0 (0.000 %)
+number of transactions with deadlock failures: 0 (0.000 %)
+attempts number average = 1.00
+attempts number stddev = 0.00
latency average = 15.844 ms
latency stddev = 2.715 ms
tps = 618.764555 (including connections establishing)
tps = 622.977698 (excluding connections establishing)
script statistics:
- - statement latencies in milliseconds:
- 0.002 \set aid random(1, 100000 * :scale)
- 0.005 \set bid random(1, 1 * :scale)
- 0.002 \set tid random(1, 10 * :scale)
- 0.001 \set delta random(-5000, 5000)
- 0.326 BEGIN;
- 0.603 UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
- 0.454 SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
- 5.528 UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
- 7.335 UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
- 0.371 INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
- 1.212 END;
+ - statement latencies in milliseconds, serialization & deadlock failures,
+ numbers of transactions attempts:
+ 0.002 0 0 - \set aid random(1, 100000 * :scale)
+ 0.005 0 0 - \set bid random(1, 1 * :scale)
+ 0.002 0 0 - \set tid random(1, 10 * :scale)
+ 0.001 0 0 - \set delta random(-5000, 5000)
+ 0.326 0 0 1.00 BEGIN;
+ 0.603 0 0 - UPDATE pgbench_accounts
+ SET abalance = abalance + :delta WHERE aid = :aid;
+ 0.454 0 0 - SELECT abalance FROM pgbench_accounts
+ WHERE aid = :aid;
+ 5.528 0 0 - UPDATE pgbench_tellers
+ SET tbalance = tbalance + :delta WHERE tid = :tid;
+ 7.335 0 0 - UPDATE pgbench_branches
+ SET bbalance = bbalance + :delta WHERE bid = :bid;
+ 0.371 0 0 - INSERT INTO pgbench_history
+ (tid, bid, aid, delta, mtime)
+ VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
+ 1.212 0 0 - END;
+</screen>
+
+ Another example of output for the default script using serializable default
+ transaction isolation level (<command>PGOPTIONS='-c
+ default_transaction_isolation=serializable' pgbench ...</command>):
+<screen>
+starting vacuum...end.
+transaction type: <builtin: TPC-B (sort of)>
+transaction maximum attempts number: 100
+scaling factor: 1
+query mode: simple
+number of clients: 10
+number of threads: 1
+number of transactions per client: 1000
+number of transactions actually processed: 10000/10000
+number of transactions with serialization failures: 3599 (35.990 %)
+number of transactions with deadlock failures: 0 (0.000 %)
+attempts number average = 47.54
+attempts number stddev = 44.04
+latency average = 235.795 ms
+latency stddev = 408.854 ms
+tps = 26.694245 (including connections establishing)
+tps = 26.697308 (excluding connections establishing)
+script statistics:
+ - statement latencies in milliseconds, serialization & deadlock failures,
+ numbers of transactions attempts:
+ 0.003 0 0 - \set aid random(1, 100000 * :scale)
+ 0.001 0 0 - \set bid random(1, 1 * :scale)
+ 0.001 0 0 - \set tid random(1, 10 * :scale)
+ 0.000 0 0 - \set delta random(-5000, 5000)
+ 4.626 0 0 47.54 BEGIN;
+ 1.165 0 0 - UPDATE pgbench_accounts
+ SET abalance = abalance + :delta WHERE aid = :aid;
+ 0.870 0 0 - SELECT abalance FROM pgbench_accounts
+ WHERE aid = :aid;
+ 1.060 456156 0 - UPDATE pgbench_tellers
+ SET tbalance = tbalance + :delta WHERE tid = :tid;
+ 0.883 12826 0 - UPDATE pgbench_branches
+ SET bbalance = bbalance + :delta WHERE bid = :bid;
+ 1.052 0 0 - INSERT INTO pgbench_history
+ (tid, bid, aid, delta, mtime)
+ VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
+ 4.866 36 0 - END;
</screen>
</para>
<para>
- If multiple script files are specified, the averages are reported
- separately for each script file.
+ If multiple script files are specified, the averages and the failures are
+ reported separately for each script file.
</para>
<para>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 4d364a1..2e84d34 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -58,6 +58,8 @@
#include "pgbench.h"
+#define ERRCODE_T_R_SERIALIZATION_FAILURE "40001"
+#define ERRCODE_T_R_DEADLOCK_DETECTED "40P01"
#define ERRCODE_UNDEFINED_TABLE "42P01"
/*
@@ -174,8 +176,12 @@ bool progress_timestamp = false; /* progress report with Unix time */
int nclients = 1; /* number of clients */
int nthreads = 1; /* number of threads */
bool is_connect; /* establish connection for each transaction */
-bool is_latencies; /* report per-command latencies */
+bool report_per_command = false; /* report per-command latencies,
+ * failures and attempts */
int main_pid; /* main process id used in log filename */
+int max_attempts_number = 1; /* maximum number of attempts to run the
+ * transaction with serialization or
+ * deadlock failures */
char *pghost = "";
char *pgport = "";
@@ -232,11 +238,35 @@ typedef struct StatsData
int64 cnt; /* number of transactions */
int64 skipped; /* number of transactions skipped under --rate
* and --latency-limit */
+ int64 serialization_failures; /* number of transactions with
+ * serialization failures */
+ int64 deadlock_failures; /* number of transactions with deadlock
+ * failures */
+ SimpleStats attempts;
SimpleStats latency;
SimpleStats lag;
} StatsData;
/*
+ * Data structure for repeating a transaction from the beginnning with the same
+ * parameters.
+ */
+typedef struct LastBeginState
+{
+ int command; /* command number in script */
+ int attempts_number; /* how many times have we tried to run the
+ * transaction without serialization or
+ * deadlock failures */
+
+ unsigned short random_state[3]; /* random seed */
+
+ /* client variables */
+ Variable *variables; /* array of variable definitions */
+ int nvariables; /* number of variables */
+ bool vars_sorted; /* are variables sorted by name? */
+} LastBeginState;
+
+/*
* Connection state machine states.
*/
typedef enum
@@ -287,6 +317,20 @@ typedef enum
CSTATE_END_COMMAND,
/*
+ * States for transactions with serialization or deadlock failures.
+ *
+ * First of all report about the failure in CSTATE_FAILURE. Then if we
+ * should end the failed transaction block go to states CSTATE_START_COMMAND
+ * -> CSTATE_WAIT_RESULT -> CSTATE_END_COMMAND with the appropriate command.
+ * After that if we are able to repeat the failed transaction go to
+ * CSTATE_RETRY_FAILED_TRANSACTION to set the same parameters for the
+ * transaction execution as they were in the previous attempts. Otherwise go
+ * to the next command after the failed transaction.
+ */
+ CSTATE_FAILURE,
+ CSTATE_RETRY_FAILED_TRANSACTION,
+
+ /*
* CSTATE_END_TX performs end-of-transaction processing. Calculates
* latency, and logs the transaction. In --connect mode, closes the
* current connection. Chooses the next script to execute and starts over
@@ -311,6 +355,7 @@ typedef struct
PGconn *con; /* connection handle to DB */
int id; /* client No. */
ConnectionStateEnum state; /* state machine's current state. */
+ unsigned short random_state[3]; /* separate randomness for each client */
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
@@ -328,6 +373,17 @@ typedef struct
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
+ bool serialization_failure; /* if there was serialization failure
+ * during script execution */
+ bool deadlock_failure; /* if there was deadlock failure during
+ * script execution */
+
+ /* for repeating transactions with serialization or deadlock failures: */
+ LastBeginState *last_begin_state;
+ bool end_failed_transaction_block; /* are we ending the failed
+ * transaction block? */
+ SimpleStats attempts;
+
/* per client collected stats */
int64 cnt; /* transaction count */
int ecnt; /* error count */
@@ -342,7 +398,6 @@ typedef struct
pthread_t thread; /* thread handle */
CState *state; /* array of CState */
int nstate; /* length of state[] */
- unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
FILE *logfile; /* where to log, or NULL */
@@ -382,6 +437,16 @@ typedef struct
char *argv[MAX_ARGS]; /* command word list */
PgBenchExpr *expr; /* parsed expression, if needed */
SimpleStats stats; /* time spent in this command */
+ int64 serialization_failures; /* number of serialization failures in
+ * this command */
+ int64 deadlock_failures; /* number of deadlock failures in this
+ * command */
+ SimpleStats attempts; /* is valid if command starts a transaction */
+
+ /* for repeating transactions with serialization and deadlock failures: */
+ bool is_transaction_begin; /* do we start a transaction? */
+ int transaction_end; /* command number to complete transaction
+ * starting in this command. */
} Command;
typedef struct ParsedScript
@@ -504,7 +569,7 @@ usage(void)
" protocol for submitting queries (default: simple)\n"
" -n, --no-vacuum do not run VACUUM before tests\n"
" -P, --progress=NUM show thread progress report every NUM seconds\n"
- " -r, --report-latencies report average latency per command\n"
+ " -r, --report-per-command report latencies, failures and attempts per command\n"
" -R, --rate=NUM target rate in transactions per second\n"
" -s, --scale=NUM report this scale factor in output\n"
" -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
@@ -513,6 +578,8 @@ usage(void)
" --aggregate-interval=NUM aggregate data over NUM seconds\n"
" --log-prefix=PREFIX prefix for transaction time log file\n"
" (default: \"pgbench_log\")\n"
+ " --max-attempts-number=NUM\n"
+ " max number of tries to run transaction (default: 1)\n"
" --progress-timestamp use Unix epoch timestamps for progress\n"
" --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
"\nCommon options:\n"
@@ -624,7 +691,7 @@ gotdigits:
/* random number generator: uniform distribution from min to max inclusive */
static int64
-getrand(TState *thread, int64 min, int64 max)
+getrand(CState *st, int64 min, int64 max)
{
/*
* Odd coding is so that min and max have approximately the same chance of
@@ -635,7 +702,7 @@ getrand(TState *thread, int64 min, int64 max)
* protected by a mutex, and therefore a bottleneck on machines with many
* CPUs.
*/
- return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
+ return min + (int64) ((max - min + 1) * pg_erand48(st->random_state));
}
/*
@@ -644,7 +711,7 @@ getrand(TState *thread, int64 min, int64 max)
* value is exp(-parameter).
*/
static int64
-getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
+getExponentialRand(CState *st, int64 min, int64 max, double parameter)
{
double cut,
uniform,
@@ -654,7 +721,7 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
Assert(parameter > 0.0);
cut = exp(-parameter);
/* erand in [0, 1), uniform in (0, 1] */
- uniform = 1.0 - pg_erand48(thread->random_state);
+ uniform = 1.0 - pg_erand48(st->random_state);
/*
* inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
@@ -667,7 +734,7 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
/* random number generator: gaussian distribution from min to max inclusive */
static int64
-getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
+getGaussianRand(CState *st, int64 min, int64 max, double parameter)
{
double stdev;
double rand;
@@ -695,8 +762,8 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
* are expected in (0, 1] (see
* http://en.wikipedia.org/wiki/Box_muller)
*/
- double rand1 = 1.0 - pg_erand48(thread->random_state);
- double rand2 = 1.0 - pg_erand48(thread->random_state);
+ double rand1 = 1.0 - pg_erand48(st->random_state);
+ double rand2 = 1.0 - pg_erand48(st->random_state);
/* Box-Muller basic form transform */
double var_sqrt = sqrt(-2.0 * log(rand1));
@@ -723,7 +790,7 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
* will approximate a Poisson distribution centered on the given value.
*/
static int64
-getPoissonRand(TState *thread, int64 center)
+getPoissonRand(CState *st, int64 center)
{
/*
* Use inverse transform sampling to generate a value > 0, such that the
@@ -732,7 +799,7 @@ getPoissonRand(TState *thread, int64 center)
double uniform;
/* erand in [0, 1), uniform in (0, 1] */
- uniform = 1.0 - pg_erand48(thread->random_state);
+ uniform = 1.0 - pg_erand48(st->random_state);
return (int64) (-log(uniform) * ((double) center) + 0.5);
}
@@ -786,24 +853,42 @@ initStats(StatsData *sd, time_t start_time)
sd->start_time = start_time;
sd->cnt = 0;
sd->skipped = 0;
+ sd->serialization_failures = 0;
+ sd->deadlock_failures = 0;
+ initSimpleStats(&sd->attempts);
initSimpleStats(&sd->latency);
initSimpleStats(&sd->lag);
}
/*
- * Accumulate one additional item into the given stats object.
+ * Accumulate statistics regardless of whether there was a failure / transaction
+ * was skipped or not.
*/
static void
-accumStats(StatsData *stats, bool skipped, double lat, double lag)
+accumMainStats(StatsData *stats, bool skipped, bool serialization_failure,
+ bool deadlock_failure, SimpleStats *attempts)
{
stats->cnt++;
-
if (skipped)
- {
- /* no latency to record on skipped transactions */
stats->skipped++;
- }
- else
+ else if (serialization_failure)
+ stats->serialization_failures++;
+ else if (deadlock_failure)
+ stats->deadlock_failures++;
+ mergeSimpleStats(&stats->attempts, attempts);
+}
+
+/*
+ * Accumulate one additional item into the given stats object.
+ */
+static void
+accumStats(StatsData *stats, bool skipped, bool serialization_failure,
+ bool deadlock_failure, double lat, double lag, SimpleStats *attempts)
+{
+ accumMainStats(stats, skipped, serialization_failure, deadlock_failure,
+ attempts);
+
+ if (!skipped && !serialization_failure && !deadlock_failure)
{
addToSimpleStats(&stats->latency, lat);
@@ -1593,7 +1678,7 @@ evalFunc(TState *thread, CState *st,
if (func == PGBENCH_RANDOM)
{
Assert(nargs == 2);
- setIntValue(retval, getrand(thread, imin, imax));
+ setIntValue(retval, getrand(st, imin, imax));
}
else /* gaussian & exponential */
{
@@ -1615,7 +1700,7 @@ evalFunc(TState *thread, CState *st,
}
setIntValue(retval,
- getGaussianRand(thread, imin, imax, param));
+ getGaussianRand(st, imin, imax, param));
}
else /* exponential */
{
@@ -1628,7 +1713,7 @@ evalFunc(TState *thread, CState *st,
}
setIntValue(retval,
- getExponentialRand(thread, imin, imax, param));
+ getExponentialRand(st, imin, imax, param));
}
}
@@ -1817,7 +1902,7 @@ commandFailed(CState *st, char *message)
/* return a script number with a weighted choice. */
static int
-chooseScript(TState *thread)
+chooseScript(CState *st)
{
int i = 0;
int64 w;
@@ -1825,7 +1910,7 @@ chooseScript(TState *thread)
if (num_scripts == 1)
return 0;
- w = getrand(thread, 0, total_weight - 1);
+ w = getrand(st, 0, total_weight - 1);
do
{
w -= sql_script[i++].weight;
@@ -1951,6 +2036,48 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
return true;
}
+static void
+free_variables_pointers(Variable *variables, int nvariables)
+{
+ Variable *current;
+
+ for (current = variables; current - variables < nvariables; ++current)
+ {
+ pg_free(current->name);
+ current->name = NULL;
+
+ pg_free(current->value);
+ current->value = NULL;
+ }
+}
+
+/* return a deep copy of variables array */
+static Variable *
+copy_variables(Variable *destination, int destination_nvariables,
+ const Variable *source, int source_nvariables)
+{
+ Variable *current_destination;
+ const Variable *current_source;
+
+ free_variables_pointers(destination, destination_nvariables);
+ destination = pg_realloc(destination, sizeof(Variable) * source_nvariables);
+
+ for (current_source = source, current_destination = destination;
+ current_source - source < source_nvariables;
+ ++current_source, ++current_destination)
+ {
+ current_destination->name = pg_strdup(current_source->name);
+ if (current_source->value)
+ current_destination->value = pg_strdup(current_source->value);
+ else
+ current_destination->value = NULL;
+ current_destination->is_numeric = current_source->is_numeric;
+ current_destination->num_value = current_source->num_value;
+ }
+
+ return destination;
+}
+
/*
* Advance the state machine of a connection, if possible.
*/
@@ -1962,6 +2089,14 @@ doCustom(TState *thread, CState *st, StatsData *agg)
instr_time now;
bool end_tx_processed = false;
int64 wait;
+ bool serialization_failure = false;
+ bool deadlock_failure = false;
+ ExecStatusType result_status;
+ char *sqlState;
+ int last_begin_command;
+ Command *last_begin;
+ int attempts_number;
+ int transaction_end;
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
@@ -1990,7 +2125,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
*/
case CSTATE_CHOOSE_SCRIPT:
- st->use_file = chooseScript(thread);
+ st->use_file = chooseScript(st);
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
@@ -2017,7 +2152,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* away.
*/
Assert(throttle_delay > 0);
- wait = getPoissonRand(thread, throttle_delay);
+ wait = getPoissonRand(st, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
@@ -2049,7 +2184,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
{
processXactStats(thread, st, &now, true, agg);
/* next rendez-vous */
- wait = getPoissonRand(thread, throttle_delay);
+ wait = getPoissonRand(st, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
}
@@ -2121,6 +2256,11 @@ doCustom(TState *thread, CState *st, StatsData *agg)
st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
}
+ /* reset transaction variables to default values */
+ st->serialization_failure = false;
+ st->deadlock_failure = false;
+ initSimpleStats(&st->attempts);
+
/* Begin with the first command */
st->command = 0;
st->state = CSTATE_START_COMMAND;
@@ -2142,11 +2282,39 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
}
+ /* reset command result variables to default values */
+ serialization_failure = false;
+ deadlock_failure = false;
+
+ if (command->is_transaction_begin && !st->last_begin_state)
+ {
+ /*
+ * It is a first attempt to run the transaction which begins
+ * in current command. Remember its parameters just in case
+ * we should repeat it in future.
+ */
+ st->last_begin_state = (LastBeginState *)
+ pg_malloc0(sizeof(LastBeginState));
+
+ st->last_begin_state->command = st->command;
+ st->last_begin_state->attempts_number = 1;
+ memcpy(st->last_begin_state->random_state, st->random_state,
+ sizeof(unsigned short) * 3);
+
+ st->last_begin_state->variables = copy_variables(
+ st->last_begin_state->variables,
+ st->last_begin_state->nvariables,
+ st->variables,
+ st->nvariables);
+ st->last_begin_state->nvariables = st->nvariables;
+ st->last_begin_state->vars_sorted = st->vars_sorted;
+ }
+
/*
* Record statement start time if per-command latencies are
* requested
*/
- if (is_latencies)
+ if (report_per_command)
{
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
@@ -2299,21 +2467,41 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* Read and discard the query result;
*/
res = PQgetResult(st->con);
- switch (PQresultStatus(res))
+ result_status = PQresultStatus(res);
+ sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+ if (sqlState) {
+ serialization_failure =
+ (strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) ==
+ 0);
+ deadlock_failure =
+ strcmp(sqlState, ERRCODE_T_R_DEADLOCK_DETECTED) == 0;
+
+ if (debug && (serialization_failure || deadlock_failure))
+ fprintf(stderr, "client %d got a %s failure (attempt %d/%d)\n",
+ st->id,
+ (serialization_failure ?
+ "serialization" :
+ "deadlock"),
+ st->last_begin_state->attempts_number,
+ max_attempts_number);
+ }
+
+ if (result_status == PGRES_COMMAND_OK ||
+ result_status == PGRES_TUPLES_OK ||
+ result_status == PGRES_EMPTY_QUERY ||
+ serialization_failure ||
+ deadlock_failure)
{
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- case PGRES_EMPTY_QUERY:
- /* OK */
- PQclear(res);
- discard_response(st);
- st->state = CSTATE_END_COMMAND;
- break;
- default:
- commandFailed(st, PQerrorMessage(st->con));
- PQclear(res);
- st->state = CSTATE_ABORTED;
- break;
+ /* OK */
+ PQclear(res);
+ discard_response(st);
+ st->state = CSTATE_END_COMMAND;
+ }
+ else
+ {
+ commandFailed(st, PQerrorMessage(st->con));
+ PQclear(res);
+ st->state = CSTATE_ABORTED;
}
break;
@@ -2337,12 +2525,70 @@ doCustom(TState *thread, CState *st, StatsData *agg)
*/
case CSTATE_END_COMMAND:
+ if (st->last_begin_state)
+ {
+ last_begin_command = st->last_begin_state->command;
+ last_begin =
+ sql_script[st->use_file].commands[last_begin_command];
+ transaction_end = last_begin->transaction_end;
+ attempts_number = st->last_begin_state->attempts_number;
+
+ if ((st->command == transaction_end) &&
+ ((!st->end_failed_transaction_block &&
+ !serialization_failure &&
+ !deadlock_failure) ||
+ attempts_number == max_attempts_number))
+ {
+ /*
+ * It is the end of transaction and:
+ * 1) this transaction was successful;
+ * 2) or this transaction has failed and we will not be
+ * able to repeat it.
+ *
+ * So let's record its number of attempts in statistics
+ * per-command and for current script execution. Also
+ * let's free its begin state because we don't not need
+ * it anymore.
+ */
+ if (debug)
+ {
+ char buffer[256];
+
+ if (serialization_failure ||
+ deadlock_failure ||
+ st->end_failed_transaction_block)
+ snprintf(buffer, sizeof(buffer), "failure");
+ else
+ snprintf(buffer, sizeof(buffer), "successful");
+
+ fprintf(stderr, "client %d ends transaction with %d attempts (%s)\n",
+ st->id, attempts_number, buffer);
+ }
+
+ addToSimpleStats(&last_begin->attempts,
+ attempts_number);
+ addToSimpleStats(&st->attempts, attempts_number);
+
+ free_variables_pointers(
+ st->last_begin_state->variables,
+ st->last_begin_state->nvariables);
+ pg_free(st->last_begin_state);
+ st->last_begin_state = NULL;
+ }
+ }
+
+ if (serialization_failure || deadlock_failure)
+ {
+ st->state = CSTATE_FAILURE;
+ break;
+ }
+
/*
* command completed: accumulate per-command execution times
* in thread-local data structure, if per-command latencies
* are requested.
*/
- if (is_latencies)
+ if (report_per_command)
{
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
@@ -2354,8 +2600,102 @@ doCustom(TState *thread, CState *st, StatsData *agg)
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
- /* Go ahead with next command */
- st->command++;
+ if (st->end_failed_transaction_block &&
+ attempts_number < max_attempts_number)
+ {
+ st->command = last_begin_command;
+ st->state = CSTATE_RETRY_FAILED_TRANSACTION;
+ }
+ else
+ {
+ /* Go ahead with next command */
+ st->command++;
+ st->state = CSTATE_START_COMMAND;
+ }
+ st->end_failed_transaction_block = false;
+
+ break;
+
+ /*
+ * Report about failure and end the failed transaction block.
+ */
+ case CSTATE_FAILURE:
+
+ /*
+ * Accumulate per-command serialization / deadlock failures
+ * count in thread-local data structure.
+ */
+ if (serialization_failure)
+ command->serialization_failures++;
+ if (deadlock_failure)
+ command->deadlock_failures++;
+
+ if (attempts_number == max_attempts_number)
+ {
+ /*
+ * We will not be able to repeat the failed transaction
+ * so let's record this failure in the stats for current
+ * script execution.
+ */
+ if (serialization_failure)
+ st->serialization_failure = true;
+ else if (deadlock_failure)
+ st->deadlock_failure = true;
+ }
+
+ if (st->command != transaction_end)
+ {
+ /* end the failed transaction block */
+ st->command = transaction_end;
+ st->end_failed_transaction_block = true;
+ st->state = CSTATE_START_COMMAND;
+ }
+ else
+ {
+ /*
+ * We are not in transaction block. So let's try to repeat
+ * the failed transaction or go ahead with next command.
+ */
+ if (attempts_number < max_attempts_number)
+ {
+ st->command = last_begin_command;
+ st->state = CSTATE_RETRY_FAILED_TRANSACTION;
+ }
+ else
+ {
+ st->command++;
+ st->state = CSTATE_START_COMMAND;
+ }
+ }
+ break;
+
+ /*
+ * Set the parameters to retry the failed transaction.
+ */
+ case CSTATE_RETRY_FAILED_TRANSACTION:
+
+ /*
+ * We assume that transaction attempts number (which should be
+ * limited by max_attempts_number) was checked earlier.
+ */
+ if (debug)
+ fprintf(stderr, "client %d repeats the failed transaction (attempt %d/%d)\n",
+ st->id,
+ st->last_begin_state->attempts_number + 1,
+ max_attempts_number);
+
+ st->last_begin_state->attempts_number++;
+ memcpy(st->random_state, st->last_begin_state->random_state,
+ sizeof(unsigned short) * 3);
+
+ st->variables = copy_variables(
+ st->variables,
+ st->nvariables,
+ st->last_begin_state->variables,
+ st->last_begin_state->nvariables);
+ st->nvariables = st->last_begin_state->nvariables;
+ st->vars_sorted = st->last_begin_state->vars_sorted;
+
st->state = CSTATE_START_COMMAND;
break;
@@ -2372,7 +2712,9 @@ doCustom(TState *thread, CState *st, StatsData *agg)
per_script_stats || use_log)
processXactStats(thread, st, &now, false, agg);
else
- thread->stats.cnt++;
+ accumMainStats(&thread->stats, false,
+ st->serialization_failure,
+ st->deadlock_failure, &st->attempts);
if (is_connect)
{
@@ -2426,6 +2768,15 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
/*
+ * return zero if there is no statistics data because of skipped transactions.
+ */
+static double
+get_average_attempts(const SimpleStats *attempts)
+{
+ return (attempts->count == 0 ? 0 : attempts->sum / attempts->count);
+}
+
+/*
* Print log entry after completing one transaction.
*
* We print Unix-epoch timestamps in the log, so that entries can be
@@ -2446,7 +2797,7 @@ doLog(TState *thread, CState *st,
* to the random sample.
*/
if (sample_rate != 0.0 &&
- pg_erand48(thread->random_state) > sample_rate)
+ pg_erand48(st->random_state) > sample_rate)
return;
/* should we aggregate the results or not? */
@@ -2462,13 +2813,20 @@ doLog(TState *thread, CState *st,
while (agg->start_time + agg_interval <= now)
{
/* print aggregated report to logfile */
- fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+ fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f " INT64_FORMAT " " INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
(long) agg->start_time,
agg->cnt,
agg->latency.sum,
agg->latency.sum2,
agg->latency.min,
- agg->latency.max);
+ agg->latency.max,
+ agg->serialization_failures,
+ agg->deadlock_failures,
+ agg->attempts.count,
+ agg->attempts.sum,
+ agg->attempts.sum2,
+ agg->attempts.min,
+ agg->attempts.max);
if (throttle_delay)
{
fprintf(logfile, " %.0f %.0f %.0f %.0f",
@@ -2486,22 +2844,34 @@ doLog(TState *thread, CState *st,
}
/* accumulate the current transaction */
- accumStats(agg, skipped, latency, lag);
+ accumStats(agg, skipped, st->serialization_failure,
+ st->deadlock_failure, latency, lag, &st->attempts);
}
else
{
/* no, print raw transactions */
struct timeval tv;
+ char transaction_label[256];
+ double attempts_avg = get_average_attempts(&st->attempts);
- gettimeofday(&tv, NULL);
if (skipped)
- fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
- st->id, st->cnt, st->use_file,
- (long) tv.tv_sec, (long) tv.tv_usec);
+ snprintf(transaction_label, sizeof(transaction_label), "skipped");
+ else if (st->serialization_failure && st->deadlock_failure)
+ snprintf(transaction_label, sizeof(transaction_label),
+ "serialization_and_deadlock_failures");
+ else if (st->serialization_failure || st->deadlock_failure)
+ snprintf(transaction_label, sizeof(transaction_label), "%s_failure",
+ st->serialization_failure ? "serialization" : "deadlock");
+
+ gettimeofday(&tv, NULL);
+ if (skipped || st->serialization_failure || st->deadlock_failure)
+ fprintf(logfile, "%d " INT64_FORMAT " %s %d %ld %ld %.0f",
+ st->id, st->cnt, transaction_label, st->use_file,
+ (long) tv.tv_sec, (long) tv.tv_usec, attempts_avg);
else
- fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
+ fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld %.0f",
st->id, st->cnt, latency, st->use_file,
- (long) tv.tv_sec, (long) tv.tv_usec);
+ (long) tv.tv_sec, (long) tv.tv_usec, attempts_avg);
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
fputc('\n', logfile);
@@ -2523,7 +2893,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
if ((!skipped) && INSTR_TIME_IS_ZERO(*now))
INSTR_TIME_SET_CURRENT(*now);
- if (!skipped)
+ if (!skipped && !st->serialization_failure && !st->deadlock_failure)
{
/* compute latency & lag */
latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
@@ -2532,21 +2902,25 @@ processXactStats(TState *thread, CState *st, instr_time *now,
if (progress || throttle_delay || latency_limit)
{
- accumStats(&thread->stats, skipped, latency, lag);
+ accumStats(&thread->stats, skipped, st->serialization_failure,
+ st->deadlock_failure, latency, lag, &st->attempts);
/* count transactions over the latency limit, if needed */
if (latency_limit && latency > latency_limit)
thread->latency_late++;
}
else
- thread->stats.cnt++;
+ accumMainStats(&thread->stats, skipped, st->serialization_failure,
+ st->deadlock_failure, &st->attempts);
if (use_log)
doLog(thread, st, agg, skipped, latency, lag);
/* XXX could use a mutex here, but we choose not to */
if (per_script_stats)
- accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
+ accumStats(&sql_script[st->use_file].stats, skipped,
+ st->serialization_failure, st->deadlock_failure, latency,
+ lag, &st->attempts);
}
@@ -2985,6 +3359,9 @@ process_sql_command(PQExpBuffer buf, const char *source)
my_command->type = SQL_COMMAND;
my_command->argc = 0;
initSimpleStats(&my_command->stats);
+ my_command->serialization_failures = 0;
+ my_command->deadlock_failures = 0;
+ initSimpleStats(&my_command->attempts);
/*
* If SQL command is multi-line, we only want to save the first line as
@@ -3054,6 +3431,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
my_command->type = META_COMMAND;
my_command->argc = 0;
initSimpleStats(&my_command->stats);
+ my_command->serialization_failures = 0;
+ my_command->deadlock_failures = 0;
+ initSimpleStats(&my_command->attempts);
/* Save first word (command name) */
j = 0;
@@ -3185,6 +3565,60 @@ process_backslash_command(PsqlScanState sstate, const char *source)
}
/*
+ * Returns the same command where all Ñontinuous blocks of whitespaces are
+ * replaced by one space symbol.
+ *
+ * Returns a malloc'd string.
+ */
+static char *
+normalize_whitespaces(const char *command)
+{
+ const char *ptr = command;
+ char *buffer = pg_malloc(strlen(command) + 1);
+ int length = 0;
+
+ while (*ptr)
+ {
+ while (*ptr && !isspace((unsigned char) *ptr))
+ buffer[length++] = *(ptr++);
+ if (isspace((unsigned char) *ptr))
+ {
+ buffer[length++] = ' ';
+ while (isspace((unsigned char) *ptr))
+ ptr++;
+ }
+ }
+ buffer[length] = '\0';
+
+ return buffer;
+}
+
+/*
+ * Returns true if given command generally ends a transaction block (we don't
+ * check here if the last transaction block is already completed).
+ */
+static bool
+is_transaction_block_end(const char *command_text)
+{
+ bool result = false;
+ char *command = normalize_whitespaces(command_text);
+
+ if (pg_strncasecmp(command, "end", 3) == 0 ||
+ (pg_strncasecmp(command, "commit", 6) == 0 &&
+ pg_strncasecmp(command, "commit prepared", 15) != 0) ||
+ (pg_strncasecmp(command, "rollback", 8) == 0 &&
+ pg_strncasecmp(command, "rollback prepared", 17) != 0 &&
+ pg_strncasecmp(command, "rollback to", 11) != 0) ||
+ (pg_strncasecmp(command, "prepare transaction ", 20) == 0 &&
+ pg_strncasecmp(command, "prepare transaction (", 21) != 0 &&
+ pg_strncasecmp(command, "prepare transaction as ", 23) != 0))
+ result = true;
+
+ pg_free(command);
+ return result;
+}
+
+/*
* Parse a script (either the contents of a file, or a built-in script)
* and add it to the list of scripts.
*/
@@ -3196,6 +3630,8 @@ ParseScript(const char *script, const char *desc, int weight)
PQExpBufferData line_buf;
int alloc_num;
int index;
+ int last_transaction_block_begin = -1;
+ bool transaction_block_completed = true;
#define COMMANDS_ALLOC_NUM 128
alloc_num = COMMANDS_ALLOC_NUM;
@@ -3238,6 +3674,8 @@ ParseScript(const char *script, const char *desc, int weight)
command = process_sql_command(&line_buf, desc);
if (command)
{
+ char *command_text = command->argv[0];
+
ps.commands[index] = command;
index++;
@@ -3247,6 +3685,36 @@ ParseScript(const char *script, const char *desc, int weight)
ps.commands = (Command **)
pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
}
+
+ /* check if there's the begin of new transaction */
+ if (transaction_block_completed)
+ {
+ /*
+ * Each sql command outside of transaction block either starts a
+ * new transaction block or is run as separate transaction.
+ */
+ command->is_transaction_begin = true;
+
+ if (pg_strncasecmp(command_text, "begin", 5) == 0 ||
+ pg_strncasecmp(command_text, "start", 5) == 0)
+ {
+ last_transaction_block_begin = index - 1;
+ transaction_block_completed = false;
+ }
+ else
+ {
+ command->transaction_end = index - 1;
+ }
+ }
+
+ /* check if command ends the transaction block */
+ if (!transaction_block_completed &&
+ is_transaction_block_end(command_text))
+ {
+ ps.commands[last_transaction_block_begin]->transaction_end =
+ index - 1;
+ transaction_block_completed = true;
+ }
}
/* If we reached a backslash, process that */
@@ -3272,6 +3740,13 @@ ParseScript(const char *script, const char *desc, int weight)
break;
}
+ if (!transaction_block_completed)
+ {
+ fprintf(stderr, "script \"%s\": last transaction block is not completed\n",
+ desc);
+ exit(1);
+ }
+
ps.commands[index] = NULL;
addScript(ps);
@@ -3474,14 +3949,34 @@ addScript(ParsedScript script)
}
static void
-printSimpleStats(char *prefix, SimpleStats *ss)
+printSimpleStats(char *prefix, SimpleStats *ss, bool print_zeros, double factor,
+ unsigned int decimals_number, char *unit_of_measure)
{
- /* print NaN if no transactions where executed */
- double latency = ss->sum / ss->count;
- double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
+ double average;
+ double stddev;
+ char buffer[256];
+
+ if (print_zeros && ss->count == 0)
+ {
+ average = 0;
+ stddev = 0;
+ }
+ else
+ {
+ /* print NaN if no transactions where executed */
+ average = ss->sum / ss->count;
+ stddev = sqrt(ss->sum2 / ss->count - average * average);
+ }
+
+ if (strlen(unit_of_measure) == 0)
+ snprintf(buffer, sizeof(buffer), "%s %%s = %%.%df\n",
+ prefix, decimals_number);
+ else
+ snprintf(buffer, sizeof(buffer), "%s %%s = %%.%df %s\n",
+ prefix, decimals_number, unit_of_measure);
- printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
- printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
+ printf(buffer, "average", factor * average);
+ printf(buffer, "stddev", factor * stddev);
}
/* print out results */
@@ -3522,6 +4017,16 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
if (total->cnt <= 0)
return;
+ printf("number of transactions with serialization failures: " INT64_FORMAT " (%.3f %%)\n",
+ total->serialization_failures,
+ (100.0 * total->serialization_failures / total->cnt));
+
+ printf("number of transactions with deadlock failures: " INT64_FORMAT " (%.3f %%)\n",
+ total->deadlock_failures,
+ (100.0 * total->deadlock_failures / total->cnt));
+
+ printSimpleStats("attempts number", &total->attempts, true, 1, 2, "");
+
if (throttle_delay && latency_limit)
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
total->skipped,
@@ -3533,7 +4038,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
100.0 * latency_late / (total->skipped + total->cnt));
if (throttle_delay || progress || latency_limit)
- printSimpleStats("latency", &total->latency);
+ printSimpleStats("latency", &total->latency, false, 0.001, 3, "ms");
else
{
/* no measurement, show average latency computed from run time */
@@ -3557,22 +4062,34 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
printf("tps = %f (excluding connections establishing)\n", tps_exclude);
/* Report per-script/command statistics */
- if (per_script_stats || latency_limit || is_latencies)
+ if (per_script_stats || latency_limit || report_per_command)
{
int i;
for (i = 0; i < num_scripts; i++)
{
if (num_scripts > 1)
+ {
printf("SQL script %d: %s\n"
" - weight: %d (targets %.1f%% of total)\n"
- " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
+ " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n"
+ " - number of transactions with serialization failures: " INT64_FORMAT " (%.3f%%)\n"
+ " - number of transactions with deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
i + 1, sql_script[i].desc,
sql_script[i].weight,
100.0 * sql_script[i].weight / total_weight,
sql_script[i].stats.cnt,
100.0 * sql_script[i].stats.cnt / total->cnt,
- sql_script[i].stats.cnt / time_include);
+ sql_script[i].stats.cnt / time_include,
+ sql_script[i].stats.serialization_failures,
+ (100.0 * sql_script[i].stats.serialization_failures /
+ sql_script[i].stats.cnt),
+ sql_script[i].stats.deadlock_failures,
+ (100.0 * sql_script[i].stats.deadlock_failures /
+ sql_script[i].stats.cnt));
+ printSimpleStats(" - attempts number", &total->attempts, true,
+ 1, 2, "");
+ }
else
printf("script statistics:\n");
@@ -3583,22 +4100,39 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
(sql_script[i].stats.skipped + sql_script[i].stats.cnt));
if (num_scripts > 1)
- printSimpleStats(" - latency", &sql_script[i].stats.latency);
-
- /* Report per-command latencies */
- if (is_latencies)
+ printSimpleStats(" - latency", &sql_script[i].stats.latency,
+ false, 0.001, 3, "ms");
+
+ /*
+ * Report per-command statistics: latencies, serialization &
+ * deadlock failures.
+ */
+ if (report_per_command)
{
Command **commands;
- printf(" - statement latencies in milliseconds:\n");
+ printf(" - statement latencies in milliseconds, serialization & deadlock failures, numbers of transactions attempts:\n");
for (commands = sql_script[i].commands;
*commands != NULL;
commands++)
- printf(" %11.3f %s\n",
+ {
+ char buffer[256];
+
+ if ((*commands)->is_transaction_begin)
+ snprintf(buffer, sizeof(buffer), "%8.2f",
+ get_average_attempts(&(*commands)->attempts));
+ else
+ snprintf(buffer, sizeof(buffer), " - ");
+
+ printf(" %11.3f %25" INT64_MODIFIER "d %25" INT64_MODIFIER "d %s %s\n",
1000.0 * (*commands)->stats.sum /
(*commands)->stats.count,
+ (*commands)->serialization_failures,
+ (*commands)->deadlock_failures,
+ buffer,
(*commands)->line);
+ }
}
}
}
@@ -3627,7 +4161,7 @@ main(int argc, char **argv)
{"progress", required_argument, NULL, 'P'},
{"protocol", required_argument, NULL, 'M'},
{"quiet", no_argument, NULL, 'q'},
- {"report-latencies", no_argument, NULL, 'r'},
+ {"report-per-command", no_argument, NULL, 'r'},
{"rate", required_argument, NULL, 'R'},
{"scale", required_argument, NULL, 's'},
{"select-only", no_argument, NULL, 'S'},
@@ -3645,6 +4179,7 @@ main(int argc, char **argv)
{"aggregate-interval", required_argument, NULL, 5},
{"progress-timestamp", no_argument, NULL, 6},
{"log-prefix", required_argument, NULL, 7},
+ {"max-attempts-number", required_argument, NULL, 8},
{NULL, 0, NULL, 0}
};
@@ -3787,7 +4322,7 @@ main(int argc, char **argv)
case 'r':
benchmarking_option_set = true;
per_script_stats = true;
- is_latencies = true;
+ report_per_command = true;
break;
case 's':
scale_given = true;
@@ -3991,6 +4526,16 @@ main(int argc, char **argv)
benchmarking_option_set = true;
logfile_prefix = pg_strdup(optarg);
break;
+ case 8:
+ benchmarking_option_set = true;
+ max_attempts_number = atoi(optarg);
+ if (max_attempts_number <= 0)
+ {
+ fprintf(stderr, "invalid number of maximum attempts: \"%s\"\n",
+ optarg);
+ exit(1);
+ }
+ break;
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
@@ -4259,9 +4804,6 @@ main(int argc, char **argv)
thread->state = &state[nclients_dealt];
thread->nstate =
(nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
- thread->random_state[0] = random();
- thread->random_state[1] = random();
- thread->random_state[2] = random();
thread->logfile = NULL; /* filled in later */
thread->latency_late = 0;
initStats(&thread->stats, 0);
@@ -4340,6 +4882,9 @@ main(int argc, char **argv)
mergeSimpleStats(&stats.lag, &thread->stats.lag);
stats.cnt += thread->stats.cnt;
stats.skipped += thread->stats.skipped;
+ stats.serialization_failures += thread->stats.serialization_failures;
+ stats.deadlock_failures += thread->stats.deadlock_failures;
+ mergeSimpleStats(&stats.attempts, &thread->stats.attempts);
latency_late += thread->latency_late;
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
}
@@ -4422,6 +4967,11 @@ threadRun(void *arg)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
+
+ /* set random state */
+ state[i].random_state[0] = random();
+ state[i].random_state[1] = random();
+ state[i].random_state[2] = random();
}
}
@@ -4613,12 +5163,15 @@ threadRun(void *arg)
/* generate and show report */
StatsData cur;
int64 run = now - last_report;
+ int64 attempts_count;
double tps,
total_run,
latency,
sqlat,
lag,
- stdev;
+ latency_stdev,
+ attempts_average,
+ attempts_stddev;
char tbuf[64];
/*
@@ -4639,6 +5192,10 @@ threadRun(void *arg)
mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
cur.cnt += thread[i].stats.cnt;
cur.skipped += thread[i].stats.skipped;
+ cur.serialization_failures +=
+ thread[i].stats.serialization_failures;
+ cur.deadlock_failures += thread[i].stats.deadlock_failures;
+ mergeSimpleStats(&cur.attempts, &thread[i].stats.attempts);
}
total_run = (now - thread_start) / 1000000.0;
@@ -4647,10 +5204,26 @@ threadRun(void *arg)
(cur.cnt - last.cnt);
sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
/ (cur.cnt - last.cnt);
- stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
+ latency_stdev = 0.001 *
+ sqrt(sqlat - 1000000.0 * latency * latency);
lag = 0.001 * (cur.lag.sum - last.lag.sum) /
(cur.cnt - last.cnt);
+ attempts_count = cur.attempts.count - last.attempts.count;
+ if (attempts_count == 0)
+ {
+ attempts_average = 0;
+ attempts_stddev = 0;
+ }
+ else
+ {
+ attempts_average = (cur.attempts.sum - last.attempts.sum) /
+ attempts_count;
+ attempts_stddev = sqrt(
+ (cur.attempts.sum2 - last.attempts.sum2) /
+ attempts_count - attempts_average * attempts_average);
+ }
+
if (progress_timestamp)
{
/*
@@ -4669,8 +5242,16 @@ threadRun(void *arg)
snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
fprintf(stderr,
- "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
- tbuf, tps, latency, stdev);
+ "progress: %s, %.1f tps, lat %.3f ms stddev %.3f, failed trx: " INT64_FORMAT " (serialization), " INT64_FORMAT " (deadlocks), attempts avg %.2f stddev %.2f",
+ tbuf,
+ tps,
+ latency,
+ latency_stdev,
+ (cur.serialization_failures -
+ last.serialization_failures),
+ (cur.deadlock_failures - last.deadlock_failures),
+ attempts_average,
+ attempts_stddev);
if (throttle_delay)
{
diff --git a/src/bin/pgbench/t/002_serialization_errors.pl b/src/bin/pgbench/t/002_serialization_errors.pl
new file mode 100644
index 0000000..3c89484
--- /dev/null
+++ b/src/bin/pgbench/t/002_serialization_errors.pl
@@ -0,0 +1,121 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 12;
+
+use constant
+{
+ READ_COMMITTED => 0,
+ REPEATABLE_READ => 1,
+ SERIALIZABLE => 2,
+};
+
+my @isolation_level_sql = ('read committed', 'repeatable read', 'serializable');
+my @isolation_level_shell = (
+ 'read\\ committed',
+ 'repeatable\\ read',
+ 'serializable');
+
+# Test concurrent update in table row with different default transaction
+# isolation levels.
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+$node->safe_psql('postgres',
+ 'CREATE UNLOGGED TABLE xy (x integer, y integer); '
+ . 'INSERT INTO xy VALUES (1, 2);');
+
+my $script = $node->basedir . '/pgbench_script';
+append_to_file($script,
+ "BEGIN;\n"
+ . "\\set delta random(-5000, 5000)\n"
+ . "UPDATE xy SET y = y + :delta WHERE x = 1;\n"
+ . "END;");
+
+sub test_pgbench
+{
+ my ($isolation_level) = @_;
+
+ my $isolation_level_sql = $isolation_level_sql[$isolation_level];
+ my $isolation_level_shell = $isolation_level_shell[$isolation_level];
+
+ local $ENV{PGPORT} = $node->port;
+ local $ENV{PGOPTIONS} =
+ "-c default_transaction_isolation=" . $isolation_level_shell;
+ print "# PGOPTIONS: " . $ENV{PGOPTIONS} . "\n";
+
+ my ($h_psql, $in_psql, $out_psql);
+ my ($h_pgbench, $in_pgbench, $out_pgbench, $stderr);
+
+ # Open the psql session and run the parallel transaction:
+ print "# Starting psql\n";
+ $h_psql = IPC::Run::start [ 'psql' ], \$in_psql, \$out_psql;
+
+ $in_psql = "begin;\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() until $out_psql =~ /BEGIN/;
+
+ $in_psql = "update xy set y = y + 1 where x = 1;\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() until $out_psql =~ /UPDATE 1/;
+
+ # Start pgbench:
+ my @command = (qw(pgbench --no-vacuum --file), $script);
+ print "# Running: " . join(" ", @command) . "\n";
+ $h_pgbench = IPC::Run::start \@command, \$in_pgbench, \$out_pgbench,
+ \$stderr;
+
+ # Let pgbench run the update command in the transaction:
+ sleep 10;
+
+ # In psql, commit the transaction and end the session:
+ $in_psql = "end;\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() until $out_psql =~ /COMMIT/;
+
+ $in_psql = "\\q\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() while length $in_psql;
+
+ $h_psql->finish();
+
+ # Get pgbench results
+ $h_pgbench->pump() until length $out_pgbench;
+ $h_pgbench->finish();
+
+ # On Windows, the exit status of the process is returned directly as the
+ # process's exit code, while on Unix, it's returned in the high bits
+ # of the exit code (see WEXITSTATUS macro in the standard <sys/wait.h>
+ # header file). IPC::Run's result function always returns exit code >> 8,
+ # assuming the Unix convention, which will always return 0 on Windows as
+ # long as the process was not terminated by an exception. To work around
+ # that, use $h->full_result on Windows instead.
+ my $result =
+ ($Config{osname} eq "MSWin32")
+ ? ($h_pgbench->full_results)[0]
+ : $h_pgbench->result(0);
+
+ # Check pgbench results
+ ok(!$result, "@command exit code 0");
+ is($stderr, '', "@command no stderr");
+
+ like($out_pgbench,
+ qr{processed: 10/10},
+ "concurrent update: $isolation_level_sql: check processed transactions");
+
+ my $regex =
+ ($isolation_level == READ_COMMITTED)
+ ? qr{serialization failures: 0 \(0\.000 %\)}
+ : qr{serialization failures: [1-9]\d* \([1-9]\d*\.\d* %\)};
+
+ like($out_pgbench,
+ $regex,
+ "concurrent update: $isolation_level_sql: check serialization failures");
+}
+
+test_pgbench(READ_COMMITTED);
+test_pgbench(REPEATABLE_READ);
+test_pgbench(SERIALIZABLE);
diff --git a/src/bin/pgbench/t/003_deadlock_errors.pl b/src/bin/pgbench/t/003_deadlock_errors.pl
new file mode 100644
index 0000000..8d92f78
--- /dev/null
+++ b/src/bin/pgbench/t/003_deadlock_errors.pl
@@ -0,0 +1,130 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 21;
+
+use constant
+{
+ READ_COMMITTED => 0,
+ REPEATABLE_READ => 1,
+ SERIALIZABLE => 2,
+};
+
+my @isolation_level_sql = ('read committed', 'repeatable read', 'serializable');
+my @isolation_level_shell = (
+ 'read\\ committed',
+ 'repeatable\\ read',
+ 'serializable');
+
+# Test concurrent deadlock updates in table with different default transaction
+# isolation levels.
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+$node->safe_psql('postgres',
+ 'CREATE UNLOGGED TABLE xy (x integer, y integer); '
+ . 'INSERT INTO xy VALUES (1, 2), (2, 3);');
+
+my $script1 = $node->basedir . '/pgbench_script1';
+append_to_file($script1,
+ "BEGIN;\n"
+ . "\\set delta1 random(-5000, 5000)\n"
+ . "\\set delta2 random(-5000, 5000)\n"
+ . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+ . "SELECT pg_sleep(20);\n"
+ . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+ . "END;");
+
+my $script2 = $node->basedir . '/pgbench_script2';
+append_to_file($script2,
+ "BEGIN;\n"
+ . "\\set delta1 random(-5000, 5000)\n"
+ . "\\set delta2 random(-5000, 5000)\n"
+ . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+ . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+ . "END;");
+
+sub test_pgbench
+{
+ my ($isolation_level) = @_;
+
+ my $isolation_level_sql = $isolation_level_sql[$isolation_level];
+ my $isolation_level_shell = $isolation_level_shell[$isolation_level];
+
+ local $ENV{PGPORT} = $node->port;
+ local $ENV{PGOPTIONS} =
+ "-c default_transaction_isolation=" . $isolation_level_shell;
+ print "# PGOPTIONS: " . $ENV{PGOPTIONS} . "\n";
+
+ my ($h1, $in1, $out1, $err1);
+ my ($h2, $in2, $out2, $err2);
+
+ # Run first pgbench
+ my @command1 = (qw(pgbench --no-vacuum --transactions=1 --file), $script1);
+ print "# Running: " . join(" ", @command1) . "\n";
+ $h1 = IPC::Run::start \@command1, \$in1, \$out1, \$err1;
+
+ # Let pgbench run first update command in the transaction:
+ sleep 10;
+
+ # Run second pgbench
+ my @command2 = (qw(pgbench --no-vacuum --transactions=1 --file), $script2);
+ print "# Running: " . join(" ", @command2) . "\n";
+ $h2 = IPC::Run::start \@command2, \$in2, \$out2, \$err2;
+
+ # Get all pgbench results
+ $h1->pump() until length $out1;
+ $h1->finish();
+
+ $h2->pump() until length $out2;
+ $h2->finish();
+
+ # On Windows, the exit status of the process is returned directly as the
+ # process's exit code, while on Unix, it's returned in the high bits
+ # of the exit code (see WEXITSTATUS macro in the standard <sys/wait.h>
+ # header file). IPC::Run's result function always returns exit code >> 8,
+ # assuming the Unix convention, which will always return 0 on Windows as
+ # long as the process was not terminated by an exception. To work around
+ # that, use $h->full_result on Windows instead.
+ my $result1 =
+ ($Config{osname} eq "MSWin32")
+ ? ($h1->full_results)[0]
+ : $h1->result(0);
+
+ my $result2 =
+ ($Config{osname} eq "MSWin32")
+ ? ($h2->full_results)[0]
+ : $h2->result(0);
+
+ # Check all pgbench results
+ ok(!$result1, "@command1 exit code 0");
+ ok(!$result2, "@command2 exit code 0");
+
+ is($err1, '', "@command1 no stderr");
+ is($err2, '', "@command2 no stderr");
+
+ like($out1,
+ qr{processed: 1/1},
+ "concurrent deadlock update: "
+ . $isolation_level_sql
+ . ": pgbench 1: check processed transactions");
+ like($out2,
+ qr{processed: 1/1},
+ "concurrent deadlock update: "
+ . $isolation_level_sql
+ . ": pgbench 2: check processed transactions");
+
+ # First or second pgbench should get a deadlock error
+ like($out1 . $out2,
+ qr{deadlock failures: 1 \(100\.000 %\)},
+ "concurrent deadlock update: "
+ . $isolation_level_sql
+ . ": check deadlock failures");
+}
+
+test_pgbench(READ_COMMITTED);
+test_pgbench(REPEATABLE_READ);
+test_pgbench(SERIALIZABLE);
diff --git a/src/bin/pgbench/t/004_retry_failed_transactions.pl b/src/bin/pgbench/t/004_retry_failed_transactions.pl
new file mode 100644
index 0000000..01a9ab2
--- /dev/null
+++ b/src/bin/pgbench/t/004_retry_failed_transactions.pl
@@ -0,0 +1,280 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 30;
+
+use constant
+{
+ READ_COMMITTED => 0,
+ REPEATABLE_READ => 1,
+ SERIALIZABLE => 2,
+};
+
+my @isolation_level_sql = ('read committed', 'repeatable read', 'serializable');
+my @isolation_level_shell = (
+ 'read\\ committed',
+ 'repeatable\\ read',
+ 'serializable');
+
+# Test concurrent update in table row with different default transaction
+# isolation levels.
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+$node->safe_psql('postgres',
+ 'CREATE UNLOGGED TABLE xy (x integer, y integer); '
+ . 'INSERT INTO xy VALUES (1, 2), (2, 3);');
+
+my $script = $node->basedir . '/pgbench_script';
+append_to_file($script,
+ "BEGIN;\n"
+ . "\\set delta random(-5000, 5000)\n"
+ . "UPDATE xy SET y = y + :delta WHERE x = 1;\n"
+ . "END;");
+
+my $script1 = $node->basedir . '/pgbench_script1';
+append_to_file($script1,
+ "BEGIN;\n"
+ . "\\set delta1 random(-5000, 5000)\n"
+ . "\\set delta2 random(-5000, 5000)\n"
+ . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+ . "SELECT pg_sleep(20);\n"
+ . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+ . "END;");
+
+my $script2 = $node->basedir . '/pgbench_script2';
+append_to_file($script2,
+ "BEGIN;\n"
+ . "\\set delta1 random(-5000, 5000)\n"
+ . "\\set delta2 random(-5000, 5000)\n"
+ . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+ . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+ . "END;");
+
+sub test_pgbench_serialization_failures
+{
+ my ($isolation_level) = @_;
+
+ my $isolation_level_sql = $isolation_level_sql[$isolation_level];
+ my $isolation_level_shell = $isolation_level_shell[$isolation_level];
+
+ local $ENV{PGPORT} = $node->port;
+ local $ENV{PGOPTIONS} =
+ "-c default_transaction_isolation=" . $isolation_level_shell;
+ print "# PGOPTIONS: " . $ENV{PGOPTIONS} . "\n";
+
+ my ($h_psql, $in_psql, $out_psql);
+ my ($h_pgbench, $in_pgbench, $out_pgbench, $stderr);
+
+ # Open the psql session and run the parallel transaction:
+ print "# Starting psql\n";
+ $h_psql = IPC::Run::start [ 'psql' ], \$in_psql, \$out_psql;
+
+ $in_psql ="begin;\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() until $out_psql =~ /BEGIN/;
+
+ $in_psql = "update xy set y = y + 1 where x = 1;\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() until $out_psql =~ /UPDATE 1/;
+
+ # Start pgbench:
+ my @command = (
+ qw(pgbench --no-vacuum --max-attempts-number 2 --debug --file),
+ $script);
+ print "# Running: " . join(" ", @command) . "\n";
+ $h_pgbench = IPC::Run::start \@command, \$in_pgbench, \$out_pgbench,
+ \$stderr;
+
+ # Let pgbench run the update command in the transaction:
+ sleep 10;
+
+ # In psql, commit the transaction and end the session:
+ $in_psql = "end;\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() until $out_psql =~ /COMMIT/;
+
+ $in_psql = "\\q\n";
+ print "# Running in psql: " . join(" ", $in_psql);
+ $h_psql->pump() while length $in_psql;
+
+ $h_psql->finish();
+
+ # Get pgbench results
+ $h_pgbench->pump() until length $out_pgbench;
+ $h_pgbench->finish();
+
+ # On Windows, the exit status of the process is returned directly as the
+ # process's exit code, while on Unix, it's returned in the high bits
+ # of the exit code (see WEXITSTATUS macro in the standard <sys/wait.h>
+ # header file). IPC::Run's result function always returns exit code >> 8,
+ # assuming the Unix convention, which will always return 0 on Windows as
+ # long as the process was not terminated by an exception. To work around
+ # that, use $h->full_result on Windows instead.
+ my $result =
+ ($Config{osname} eq "MSWin32")
+ ? ($h_pgbench->full_results)[0]
+ : $h_pgbench->result(0);
+
+ # Check pgbench results
+ ok(!$result, "@command exit code 0");
+
+ like($out_pgbench,
+ qr{processed: 10/10},
+ "concurrent update with retrying: "
+ . $isolation_level_sql
+ . ": check processed transactions");
+
+ like($out_pgbench,
+ qr{serialization failures: 0 \(0\.000 %\)},
+ "concurrent update with retrying: "
+ . $isolation_level_sql
+ . ": check serialization failures");
+
+ my $pattern =
+ "client 0 sending UPDATE xy SET y = y \\+ (-?\\d+) WHERE x = 1;\n"
+ . "(client 0 receiving\n)+"
+ . "client 0 got a serialization failure \\(attempt 1/2\\)\n"
+ . "client 0 sending END;\n"
+ . "\\g2+"
+ . "client 0 repeats the failed transaction \\(attempt 2/2\\)\n"
+ . "client 0 sending BEGIN;\n"
+ . "\\g2+"
+ . "client 0 executing \\\\set delta\n"
+ . "client 0 sending UPDATE xy SET y = y \\+ \\g1 WHERE x = 1;";
+
+ like($stderr,
+ qr{$pattern},
+ "concurrent update with retrying: "
+ . $isolation_level_sql
+ . ": check the retried transaction");
+}
+
+sub test_pgbench_deadlock_failures
+{
+ my ($isolation_level) = @_;
+
+ my $isolation_level_sql = $isolation_level_sql[$isolation_level];
+ my $isolation_level_shell = $isolation_level_shell[$isolation_level];
+
+ local $ENV{PGPORT} = $node->port;
+ local $ENV{PGOPTIONS} =
+ "-c default_transaction_isolation=" . $isolation_level_shell;
+
+ my ($h1, $in1, $out1, $err1);
+ my ($h2, $in2, $out2, $err2);
+
+ # Run first pgbench
+ my @command1 = (
+ qw(pgbench --no-vacuum --transactions=1 --max-attempts-number=2),
+ qw(--debug --file), $script1);
+ print "# Running: " . join(" ", @command1) . "\n";
+ $h1 = IPC::Run::start \@command1, \$in1, \$out1, \$err1;
+
+ # Let pgbench run first update command in the transaction:
+ sleep 10;
+
+ # Run second pgbench
+ my @command2 = (
+ qw(pgbench --no-vacuum --transactions=1 --max-attempts-number=2),
+ qw(--debug --file), $script2);
+ print "# Running: " . join(" ", @command2) . "\n";
+ $h2 = IPC::Run::start \@command2, \$in2, \$out2, \$err2;
+
+ # Get all pgbench results
+ $h1->pump() until length $out1;
+ $h1->finish();
+
+ $h2->pump() until length $out2;
+ $h2->finish();
+
+ # On Windows, the exit status of the process is returned directly as the
+ # process's exit code, while on Unix, it's returned in the high bits
+ # of the exit code (see WEXITSTATUS macro in the standard <sys/wait.h>
+ # header file). IPC::Run's result function always returns exit code >> 8,
+ # assuming the Unix convention, which will always return 0 on Windows as
+ # long as the process was not terminated by an exception. To work around
+ # that, use $h->full_result on Windows instead.
+ my $result1 =
+ ($Config{osname} eq "MSWin32")
+ ? ($h1->full_results)[0]
+ : $h1->result(0);
+
+ my $result2 =
+ ($Config{osname} eq "MSWin32")
+ ? ($h2->full_results)[0]
+ : $h2->result(0);
+
+ # Check all pgbench results
+ ok(!$result1, "@command1 exit code 0");
+ ok(!$result2, "@command2 exit code 0");
+
+ like($out1,
+ qr{processed: 1/1},
+ "concurrent deadlock update with retrying: "
+ . $isolation_level_sql
+ . ": pgbench 1: check processed transactions");
+ like($out2,
+ qr{processed: 1/1},
+ "concurrent deadlock update with retrying: "
+ . $isolation_level_sql
+ . ": pgbench 2: check processed transactions");
+
+ like($out1,
+ qr{deadlock failures: 0 \(0\.000 %\)},
+ "concurrent deadlock update with retrying: "
+ . $isolation_level_sql
+ . ": pgbench 1: check deadlock failures");
+ like($out2,
+ qr{deadlock failures: 0 \(0\.000 %\)},
+ "concurrent deadlock update with retrying: "
+ . $isolation_level_sql
+ . ": pgbench 2: check deadlock failures");
+
+ # First or second pgbench should get a deadlock error
+ like($err1 . $err2,
+ qr{client 0 got a deadlock failure},
+ "concurrent deadlock update with retrying: "
+ . $isolation_level_sql
+ . ": check deadlock failure in debug logs");
+
+ if ($isolation_level == READ_COMMITTED)
+ {
+ my $pattern =
+ "client 0 sending UPDATE xy SET y = y \\+ (-?\\d+) WHERE x = (\\d);\n"
+ . "(client 0 receiving\n)+"
+ . "(|client 0 sending SELECT pg_sleep\\(20\\);\n)"
+ . "\\g3*"
+ . "client 0 sending UPDATE xy SET y = y \\+ (-?\\d+) WHERE x = (\\d);\n"
+ . "\\g3+"
+ . "client 0 got a deadlock failure \\(attempt 1/2\\)\n"
+ . "client 0 sending END;\n"
+ . "\\g3+"
+ . "client 0 repeats the failed transaction \\(attempt 2/2\\)\n"
+ . "client 0 sending BEGIN;\n"
+ . "\\g3+"
+ . "client 0 executing \\\\set delta1\n"
+ . "client 0 executing \\\\set delta2\n"
+ . "client 0 sending UPDATE xy SET y = y \\+ \\g1 WHERE x = \\g2;\n"
+ . "\\g3+"
+ . "\\g4"
+ . "\\g3*"
+ . "client 0 sending UPDATE xy SET y = y \\+ \\g5 WHERE x = \\g6;\n";
+
+ like($err1 . $err2,
+ qr{$pattern},
+ "concurrent deadlock update with retrying: "
+ . $isolation_level_sql
+ . ": check the retried transaction");
+ }
+}
+
+test_pgbench_serialization_failures(REPEATABLE_READ);
+test_pgbench_serialization_failures(SERIALIZABLE);
+
+test_pgbench_deadlock_failures(READ_COMMITTED);
+test_pgbench_deadlock_failures(REPEATABLE_READ);
+test_pgbench_deadlock_failures(SERIALIZABLE);
--
1.9.1
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers