Vadim Tkachenko has proposed merging lp:~vadim-tk/sysbench/inj-rate into
lp:sysbench.
Requested reviews:
Alexey Kopytov (akopytov)
For more details, see:
https://code.launchpad.net/~vadim-tk/sysbench/inj-rate/+merge/94662
This is rather a proof of concept than a final proposal.
I am doing merge proposal to seek a feedback what changes needs to be done.
--
https://code.launchpad.net/~vadim-tk/sysbench/inj-rate/+merge/94662
Your team sysbench-developers is subscribed to branch lp:sysbench.
=== modified file 'sysbench/db_driver.c'
--- sysbench/db_driver.c 2011-09-26 13:54:56 +0000
+++ sysbench/db_driver.c 2012-02-25 18:13:19 +0000
@@ -77,6 +77,9 @@
static void db_update_thread_stats(int, db_query_type_t);
static void db_reset_stats(void);
+extern int queue_counter;
+extern int current_concurrency;
+
/* DB layer arguments */
static sb_arg_t db_args[] =
@@ -830,15 +833,15 @@
seconds = NS2SEC(sb_timer_split(&sb_globals.exec_timer));
log_timestamp(LOG_NOTICE, &sb_globals.exec_timer,
- "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f "
- "response time: %4.2fms (%u%%)",
+ "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f, "
+ "response time: %4.2fms (%u%%), queue size: %d, concurrency: %d",
sb_globals.num_threads,
(transactions - last_transactions) / seconds,
(read_ops - last_read_ops) / seconds,
(write_ops - last_write_ops) / seconds,
NS2MS(sb_percentile_calculate(&local_percentile,
sb_globals.percentile_rank)),
- sb_globals.percentile_rank);
+ sb_globals.percentile_rank, queue_counter, current_concurrency);
SB_THREAD_MUTEX_LOCK();
last_transactions = transactions;
=== modified file 'sysbench/sb_timer.c'
--- sysbench/sb_timer.c 2011-07-21 16:45:22 +0000
+++ sysbench/sb_timer.c 2012-02-25 18:13:19 +0000
@@ -35,7 +35,7 @@
static inline void sb_timer_update(sb_timer_t *t)
{
SB_GETTIME(&t->time_end);
- t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start);
+ t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start) + t->queue_time;
}
/* initialize timer */
@@ -59,6 +59,7 @@
t->sum_time = 0;
t->events = 0;
t->elapsed = 0;
+ t->queue_time = 0;
}
=== modified file 'sysbench/sb_timer.h'
--- sysbench/sb_timer.h 2011-07-21 16:45:22 +0000
+++ sysbench/sb_timer.h 2012-02-25 18:13:19 +0000
@@ -77,6 +77,7 @@
unsigned long long max_time;
unsigned long long sum_time;
unsigned long long events;
+ unsigned long long queue_time;
timer_state_t state;
} sb_timer_t;
=== modified file 'sysbench/sysbench.c'
--- sysbench/sysbench.c 2011-09-26 13:54:56 +0000
+++ sysbench/sysbench.c 2012-02-25 18:13:19 +0000
@@ -108,8 +108,6 @@
SB_ARG_TYPE_STRING, "off"},
{"thread-stack-size", "size of stack per thread", SB_ARG_TYPE_SIZE, "64K"},
{"tx-rate", "target transaction rate (tps)", SB_ARG_TYPE_INT, "0"},
- {"tx-jitter", "target transaction variation, in microseconds",
- SB_ARG_TYPE_INT, "0"},
{"report-interval", "periodically report intermediate statistics "
"with a specified interval in seconds. 0 disables intermediate reports",
SB_ARG_TYPE_INT, "0"},
@@ -151,6 +149,15 @@
static pthread_mutex_t thread_start_mutex;
static pthread_attr_t thread_attr;
+/* structures to handle queue of events, needed for tx_rate mode */
+static pthread_mutex_t queue_mutex;
+static pthread_cond_t queue_cv;
+int queue_counter = 0;
+int current_concurrency = 0;
+static int queue_is_full = 0;
+#define MAX_QUEUE_LEN 100000
+static unsigned long long queue_data[MAX_QUEUE_LEN];
+
static void print_header(void);
static void print_usage(void);
static void print_run_mode(sb_test_t *);
@@ -199,6 +206,8 @@
static int execute_request(sb_test_t *test, sb_request_t *r,int thread_id)
{
unsigned int rc;
+
+
if (test->ops.execute_request != NULL)
rc = test->ops.execute_request(r, thread_id);
@@ -361,8 +370,7 @@
if (sb_globals.tx_rate > 0)
{
log_text(LOG_NOTICE,
- "Target transaction rate: %d/sec, with jitter %d usec",
- sb_globals.tx_rate, sb_globals.tx_jitter);
+ "Target transaction rate: %d/sec", sb_globals.tx_rate);
}
if (sb_globals.report_interval)
@@ -434,15 +442,13 @@
sb_thread_ctxt_t *ctxt;
sb_test_t *test;
unsigned int thread_id;
- long long period_ns = 0;
- long long jitter_ns = 0;
- long long pause_ns;
- struct timespec target_tv, now_tv;
-
+ int queue_loop;
+ unsigned long long queue_start_time;
+
ctxt = (sb_thread_ctxt_t *)arg;
test = ctxt->test;
thread_id = ctxt->id;
-
+
log_text(LOG_DEBUG, "Runner thread started (%d)!", thread_id);
if (test->ops.thread_init != NULL && test->ops.thread_init(thread_id) != 0)
{
@@ -450,17 +456,6 @@
return NULL; /* thread initialization failed */
}
- if (sb_globals.tx_rate > 0)
- {
- /* initialize tx_rate variables */
- period_ns = floor(1e9 / sb_globals.tx_rate * sb_globals.num_threads + 0.5);
- if (sb_globals.tx_jitter > 0)
- jitter_ns = sb_globals.tx_jitter * 1000;
- else
- /* Default jitter is 1/10th of the period */
- jitter_ns = period_ns / 10;
- }
-
/*
We do this to make sure all threads get to this barrier
about the same time
@@ -469,25 +464,54 @@
sb_globals.num_running++;
pthread_mutex_unlock(&thread_start_mutex);
- if (sb_globals.tx_rate > 0)
- {
- /* we are time-rating transactions */
- SB_GETTIME(&target_tv);
- /* For the first transaction - ramp up */
- pause_ns = period_ns / sb_globals.num_threads * thread_id;
- add_ns_to_timespec(&target_tv, period_ns);
- usleep(pause_ns / 1000);
- }
-
do
{
+
+ /* If we are in tx_rate mode, we take events from queue */
+ if (sb_globals.tx_rate > 0)
+ {
+ if (queue_is_full)
+ {
+ log_errno(LOG_FATAL, "Event queue is full.");
+ break;
+ }
+ pthread_mutex_lock (&queue_mutex);
+ while(!queue_counter)
+ pthread_cond_wait (&queue_cv, &queue_mutex);
+
+ queue_start_time = queue_data[0];
+ /* This is probably a quite uneffective way to handle queue,
+ may need to use copy() function */
+
+ for (queue_loop=0; queue_loop < queue_counter; queue_loop++)
+ queue_data[queue_loop] = queue_data[queue_loop+1];
+
+ queue_counter--;
+
+ pthread_mutex_unlock(&queue_mutex);
+
+ (&timers[thread_id])->queue_time = sb_timer_value(&sb_globals.exec_timer) - queue_start_time;
+
+ /* we do it without mutex protection, that's fine to have racing */
+ current_concurrency++;
+ }
+
+
request = get_request(test, thread_id);
+
/* check if we shall execute it */
if (request.type != SB_REQ_TYPE_NULL)
{
if (execute_request(test, &request, thread_id))
break; /* break if error returned (terminates only one thread) */
}
+
+ if (sb_globals.tx_rate > 0)
+ {
+ /* we do it without mutex protection, that's fine to have racing */
+ current_concurrency--;
+ }
+
/* Check if we have a time limit */
if (sb_globals.max_time != 0 &&
sb_timer_value(&sb_globals.exec_timer) >= SEC2NS(sb_globals.max_time))
@@ -496,17 +520,6 @@
break;
}
- /* check if we are time-rating transactions and need to pause */
- if (sb_globals.tx_rate > 0)
- {
- add_ns_to_timespec(&target_tv, period_ns);
- SB_GETTIME(&now_tv);
- pause_ns = TIMESPEC_DIFF(target_tv, now_tv) - (jitter_ns / 2) +
- (sb_rnd() % jitter_ns);
- if (pause_ns > 5000)
- usleep(pause_ns / 1000);
- }
-
} while ((request.type != SB_REQ_TYPE_NULL) && (!sb_globals.error) );
if (test->ops.thread_done != NULL)
@@ -519,6 +532,60 @@
return NULL;
}
+static void *eventgen_thread_proc(void *arg)
+{
+ unsigned long long pause_ns;
+ unsigned long long prev_ns;
+ unsigned long long next_ns;
+ unsigned long long curr_ns;
+ unsigned long long intr_ns;
+
+ (void)arg; /* unused */
+
+ log_text(LOG_DEBUG, "Event generating thread started");
+
+ pthread_mutex_lock(&thread_start_mutex);
+ pthread_mutex_unlock(&thread_start_mutex);
+
+ curr_ns = sb_timer_value(&sb_globals.exec_timer);
+ /* emulate exponential distribution with Lambda = tx_rate */
+ intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
+ next_ns = curr_ns + intr_ns*1000;
+
+ for (;;)
+ {
+ prev_ns = curr_ns;
+
+ curr_ns = sb_timer_value(&sb_globals.exec_timer);
+
+ /* emulate exponential distribution with Lambda = tx_rate */
+ intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
+
+ next_ns = next_ns + intr_ns*1000;
+ if (next_ns > curr_ns)
+ pause_ns = next_ns - curr_ns;
+ else
+ pause_ns = 1000;
+
+ usleep(pause_ns/1000);
+
+ pthread_mutex_lock(&queue_mutex);
+ queue_data[queue_counter]=sb_timer_value(&sb_globals.exec_timer);
+ queue_counter++;
+ if (queue_counter >= MAX_QUEUE_LEN)
+ queue_is_full = 1;
+ pthread_cond_signal(&queue_cv);
+ pthread_mutex_unlock(&queue_mutex);
+
+ if (queue_is_full)
+ {
+ log_errno(LOG_FATAL, "Event queue is full.");
+ return NULL;
+ }
+ }
+
+ return NULL;
+}
/* Intermediate reports thread */
@@ -627,6 +694,7 @@
int err;
pthread_t report_thread;
pthread_t checkpoints_thread;
+ pthread_t eventgen_thread;
int report_thread_created = 0;
int checkpoints_thread_created = 0;
@@ -652,6 +720,12 @@
return 1;
pthread_mutex_init(&sb_globals.exec_mutex, NULL);
+
+
+ pthread_mutex_init(&queue_mutex, NULL);
+ pthread_cond_init(&queue_cv, NULL);
+ queue_counter = 0;
+ queue_is_full = 0;
/* start mutex used for barrier */
pthread_mutex_init(&thread_start_mutex,NULL);
@@ -686,6 +760,13 @@
report_thread_created = 1;
}
+ if ((err = pthread_create(&eventgen_thread, &thread_attr, &eventgen_thread_proc,
+ NULL)) != 0)
+ {
+ log_errno(LOG_FATAL, "pthread_create() for the reporting thread failed.");
+ return 1;
+ }
+
if (sb_globals.n_checkpoints > 0)
{
/* Create a thread for checkpoint statistic reports */
@@ -768,6 +849,10 @@
if (pthread_cancel(report_thread) || pthread_join(report_thread, NULL))
log_errno(LOG_FATAL, "Terminating the reporting thread failed.");
}
+
+ if (pthread_cancel(eventgen_thread) || pthread_join(eventgen_thread, NULL))
+ log_errno(LOG_FATAL, "Terminating the event generator thread failed.");
+
if (checkpoints_thread_created)
{
if (pthread_cancel(checkpoints_thread) ||
@@ -914,7 +999,6 @@
rand_res = sb_get_value_int("rand-spec-res");
sb_globals.tx_rate = sb_get_value_int("tx-rate");
- sb_globals.tx_jitter = sb_get_value_int("tx-jitter");
sb_globals.report_interval = sb_get_value_int("report-interval");
sb_globals.n_checkpoints = 0;
=== modified file 'sysbench/sysbench.h'
--- sysbench/sysbench.h 2011-07-21 17:02:14 +0000
+++ sysbench/sysbench.h 2012-02-25 18:13:19 +0000
@@ -97,6 +97,7 @@
{
int type;
struct sb_test_t *test;
+ unsigned long long start_time_queue; /* bad hack, need to look how to fix */
/* type-specific data */
union
_______________________________________________
Mailing list: https://launchpad.net/~sysbench-developers
Post to : [email protected]
Unsubscribe : https://launchpad.net/~sysbench-developers
More help : https://help.launchpad.net/ListHelp