Hello Heikki,
I did that in the attached version: no more environment variable hack, and no execution shortcut even if there is nothing to do. I also had to reproduce the progress logic to keep on printing report of (no) progress in this tailing phase.On second thoughts, there's one problem with this approach of always waiting until -T is up. What if all the threads died because of errors? For example:
Good corner-case catch! This behavior is indeed silly.
I don't think you want to wait in that situation. I think we should wait at the end only if there some threads still alive, with nothing to do only because of --rate.
Yep. The attached version does only the tailing stuff under -R and not all threads were stopped on errors, with comments to tell about the why.
I'm still wondering about a specific option to explicitely require this behavioral change.
-- Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 41b756c089..a128016196 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -5597,6 +5597,96 @@ main(int argc, char **argv) return 0; } +/* display the progress report */ +static void +doProgressReport(TState *thread, StatsData *plast, int64 *plast_report, + int64 now, int64 thread_start) +{ + StatsData cur; + int64 run = now - *plast_report, + ntx; + double tps, + total_run, + latency, + sqlat, + lag, + stdev; + char tbuf[64]; + int i; + + /* + * Add up the statistics of all threads. + * + * XXX: No locking. There is no guarantee that we get an + * atomic snapshot of the transaction count and latencies, so + * these figures can well be off by a small amount. The + * progress report's purpose is to give a quick overview of + * how the test is going, so that shouldn't matter too much. + * (If a read from a 64-bit integer is not atomic, you might + * get a "torn" read and completely bogus latencies though!) + */ + initStats(&cur, 0); + for (i = 0; i < nthreads; i++) + { + mergeSimpleStats(&cur.latency, &thread[i].stats.latency); + mergeSimpleStats(&cur.lag, &thread[i].stats.lag); + cur.cnt += thread[i].stats.cnt; + cur.skipped += thread[i].stats.skipped; + } + + /* we count only actually executed transactions */ + ntx = (cur.cnt - cur.skipped) - (plast->cnt - plast->skipped); + total_run = (now - thread_start) / 1000000.0; + tps = 1000000.0 * ntx / run; + if (ntx > 0) + { + latency = 0.001 * (cur.latency.sum - plast->latency.sum) / ntx; + sqlat = 1.0 * (cur.latency.sum2 - plast->latency.sum2) / ntx; + stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); + lag = 0.001 * (cur.lag.sum - plast->lag.sum) / ntx; + } + else + { + latency = sqlat = stdev = lag = 0; + } + + if (progress_timestamp) + { + /* + * On some platforms the current system timestamp is + * available in now_time, but rather than get entangled + * with that, we just eat the cost of an extra syscall in + * all cases. + */ + struct timeval tv; + + gettimeofday(&tv, NULL); + snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s", + (long) tv.tv_sec, (long) (tv.tv_usec / 1000)); + } + else + { + /* round seconds are expected, but the thread may be late */ + snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run); + } + + fprintf(stderr, + "progress: %s, %.1f tps, lat %.3f ms stddev %.3f", + tbuf, tps, latency, stdev); + + if (throttle_delay) + { + fprintf(stderr, ", lag %.3f ms", lag); + if (latency_limit) + fprintf(stderr, ", " INT64_FORMAT " skipped", + cur.skipped - plast->skipped); + } + fprintf(stderr, "\n"); + + *plast = cur; + *plast_report = now; +} + static void * threadRun(void *arg) { @@ -5606,6 +5696,7 @@ threadRun(void *arg) end; int nstate = thread->nstate; int remains = nstate; /* number of remaining clients */ + int aborted = 0; int i; /* for reporting progress: */ @@ -5844,6 +5935,10 @@ threadRun(void *arg) /* If doCustom changed client to finished state, reduce remains */ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) remains--; + + /* count aborted clients */ + if (st->state == CSTATE_ABORTED) + aborted++; } /* progress report is made by thread 0 for all threads */ @@ -5856,93 +5951,15 @@ threadRun(void *arg) now = INSTR_TIME_GET_MICROSEC(now_time); if (now >= next_report) { - /* generate and show report */ - StatsData cur; - int64 run = now - last_report, - ntx; - double tps, - total_run, - latency, - sqlat, - lag, - stdev; - char tbuf[315]; - - /* - * Add up the statistics of all threads. - * - * XXX: No locking. There is no guarantee that we get an - * atomic snapshot of the transaction count and latencies, so - * these figures can well be off by a small amount. The - * progress report's purpose is to give a quick overview of - * how the test is going, so that shouldn't matter too much. - * (If a read from a 64-bit integer is not atomic, you might - * get a "torn" read and completely bogus latencies though!) - */ - initStats(&cur, 0); - for (i = 0; i < nthreads; i++) - { - mergeSimpleStats(&cur.latency, &thread[i].stats.latency); - mergeSimpleStats(&cur.lag, &thread[i].stats.lag); - cur.cnt += thread[i].stats.cnt; - cur.skipped += thread[i].stats.skipped; - } - - /* we count only actually executed transactions */ - ntx = (cur.cnt - cur.skipped) - (last.cnt - last.skipped); - total_run = (now - thread_start) / 1000000.0; - tps = 1000000.0 * ntx / run; - if (ntx > 0) - { - latency = 0.001 * (cur.latency.sum - last.latency.sum) / ntx; - sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) / ntx; - stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); - lag = 0.001 * (cur.lag.sum - last.lag.sum) / ntx; - } - else - { - latency = sqlat = stdev = lag = 0; - } - - if (progress_timestamp) - { - /* - * On some platforms the current system timestamp is - * available in now_time, but rather than get entangled - * with that, we just eat the cost of an extra syscall in - * all cases. - */ - struct timeval tv; - - gettimeofday(&tv, NULL); - snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s", - (long) tv.tv_sec, (long) (tv.tv_usec / 1000)); - } - else - { - /* round seconds are expected, but the thread may be late */ - snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run); - } - - fprintf(stderr, - "progress: %s, %.1f tps, lat %.3f ms stddev %.3f", - tbuf, tps, latency, stdev); - - if (throttle_delay) - { - fprintf(stderr, ", lag %.3f ms", lag); - if (latency_limit) - fprintf(stderr, ", " INT64_FORMAT " skipped", - cur.skipped - last.skipped); - } - fprintf(stderr, "\n"); - - last = cur; - last_report = now; + doProgressReport(thread, &last, + &last_report, now, thread_start); /* * Ensure that the next report is in the future, in case - * pgbench/postgres got stuck somewhere. + * pgbench/postgres got stuck somewhere. This may skip + * some progress reports if the thread does not get enough + * cpu time. In such case, probably the whole bench should + * be ignored. */ do { @@ -5953,6 +5970,51 @@ threadRun(void *arg) } done: + /* + * Under -R, comply with -T and -P even if there is nothing to do, + * (unless all clients aborted) and ensure that one report is printed. + * This special behavior allows tap tests to check that a run lasts + * as expected and that some progress is shown, even on very slow hosts. + */ + if (duration && throttle_delay && aborted < nstate && thread->tid == 0) + { + int64 thread_end = thread_start + (int64) duration * 1000000; + instr_time now_time; + int64 now; + + INSTR_TIME_SET_CURRENT(now_time); + now = INSTR_TIME_GET_MICROSEC(now_time); + + while (now < thread_end) + { + if (progress && next_report <= thread_end) + { + pg_usleep(next_report - now); + INSTR_TIME_SET_CURRENT(now_time); + now = INSTR_TIME_GET_MICROSEC(now_time); + doProgressReport(thread, &last, &last_report, now, thread_start); + /* schedule next report */ + do + { + next_report += (int64) progress * 1000000; + } while (now >= next_report); + } + else + { + pg_usleep(thread_end - now); + INSTR_TIME_SET_CURRENT(now_time); + now = INSTR_TIME_GET_MICROSEC(now_time); + } + } + + /* + * Print a closing progress report if none were printed + * and at least one was expected. + */ + if (progress && progress <= duration && last_report == thread_start) + doProgressReport(thread, &last, &last_report, now, thread_start); + } + INSTR_TIME_SET_CURRENT(start); disconnect_all(state, nstate); INSTR_TIME_SET_CURRENT(end); diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index 2fc021dde7..e73b9c62bb 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -4,13 +4,14 @@ use warnings; use PostgresNode; use TestLib; use Test::More; +use Time::HiRes qw{time}; # start a pgbench specific server my $node = get_new_node('main'); $node->init; $node->start; -# invoke pgbench +# invoke pgbench, return elapsed time sub pgbench { local $Test::Builder::Level = $Test::Builder::Level + 1; @@ -40,12 +41,14 @@ sub pgbench append_to_file($filename, $$files{$fn}); } } + + my $t0 = time(); $node->command_checks_all(\@cmd, $stat, $out, $err, $name); # cleanup? #unlink @filenames or die "cannot unlink files (@filenames): $!"; - return; + return time() - $t0; } # Test concurrent insertion into table with UNIQUE oid column. DDL expects @@ -828,7 +831,44 @@ sub check_pgbench_logs my $bdir = $node->basedir; -# with sampling rate +# The point of this test is coverage of various time-related features +# (-T, -P, --aggregate-interval, --rate, --latency-limit...), so it is +# somehow time sensitive. +# The checks performed are quite loose so as to still pass even under +# degraded (high load, slow host, valgrind run) testing conditions. +# The main point is to provide coverage, most of the time. +my $elapsed = pgbench( + "-T 2 -P 1 -l --log-prefix=$bdir/001_pgbench_log_1 --aggregate-interval=1" + . ' -S -b se@2 --rate=20 --latency-limit=1000 -j ' . $nthreads + . ' -c 3 -r', + 0, + [ qr{type: multiple}, + qr{clients: 3}, + qr{threads: $nthreads}, + # the shown duration is really -T argument value + qr{duration: 2 s}, + qr{script 1: .* select only}, + qr{script 2: .* select only}, + qr{statement latencies in milliseconds}, + qr{FROM pgbench_accounts} ], + [ qr{vacuum}, + qr{progress: \d\b} ], + 'pgbench progress'); + +# this depends on pgbench to avoid a shortcut under --rate +# if there is nothing to do. +ok($elapsed >= 2.0, "-T 2 leads to at least 2 second run"); + +# if the test has gone AWOL, coldly skip these detailed checks. +if (abs($elapsed - 2.0) < 0.5) +{ + # $nthreads threads, 2 seconds, but due to timing imprecision we might get + # only 1 or as many as 3 progress reports per thread. + check_pgbench_logs("$bdir/001_pgbench_log_1", $nthreads, 1, 3, + qr{^\d+ \d{1,2} \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+$}); +} + +# with sampling rate, not time sensitive pgbench( "-n -S -t 50 -c 2 --log --log-prefix=$bdir/001_pgbench_log_2 --sampling-rate=0.5", 0, @@ -836,6 +876,8 @@ pgbench( [qr{^$}], 'pgbench logs'); +# random 50% of 2*50 transactions, accept between 8 and 92 +# probability of failure is about 2 * 2^-42 (?) check_pgbench_logs("$bdir/001_pgbench_log_2", 1, 8, 92, qr{^0 \d{1,2} \d+ \d \d+ \d+$});