On 2020-04-07 05:15:03 -0700, Andres Freund wrote:
> Attached is a substantially polished version of my patches. Note that
> the first three patches, as well as the last, are not intended to be
> committed at this time / in this form - they're there to make testing
> easier.

I didn't actually attached that last not-to-be-committed patch... It's
just the pgbench patch that I had attached before (and started a thread
about). Here it is again.
>From 59a9a03da728d53364f9c3d6fe8b48e21697b93e Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 6 Apr 2020 21:28:55 -0700
Subject: [PATCH v7 12/12] WIP: pgbench

---
 src/bin/pgbench/pgbench.c | 107 +++++++++++++++++++++++++++++---------
 1 file changed, 83 insertions(+), 24 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e99af801675..21d1ab2aac1 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -310,6 +310,10 @@ typedef struct RandomState
 /* Various random sequences are initialized from this one. */
 static RandomState base_random_sequence;
 
+#ifdef ENABLE_THREAD_SAFETY
+pthread_barrier_t conn_barrier;
+#endif
+
 /*
  * Connection state machine states.
  */
@@ -5206,6 +5210,10 @@ printResults(StatsData *total, instr_time total_time,
 	tps_exclude = ntx /
 		(time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
 
+	//fprintf(stderr, "time: include: %f, exclude: %f, conn total: %f\n",
+	//		time_include, time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients),
+	//		INSTR_TIME_GET_DOUBLE(conn_total_time));
+
 	/* Report test parameters. */
 	printf("transaction type: %s\n",
 		   num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
@@ -6126,26 +6134,14 @@ main(int argc, char **argv)
 	/* all clients must be assigned to a thread */
 	Assert(nclients_dealt == nclients);
 
-	/* get start up time */
-	INSTR_TIME_SET_CURRENT(start_time);
-
-	/* set alarm if duration is specified. */
-	if (duration > 0)
-		setalarm(duration);
-
 	/* start threads */
 #ifdef ENABLE_THREAD_SAFETY
+	pthread_barrier_init(&conn_barrier, NULL, nthreads);
+
 	for (i = 0; i < nthreads; i++)
 	{
 		TState	   *thread = &threads[i];
 
-		INSTR_TIME_SET_CURRENT(thread->start_time);
-
-		/* compute when to stop */
-		if (duration > 0)
-			end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
-				(int64) 1000000 * duration;
-
 		/* the first thread (i = 0) is executed by main thread */
 		if (i > 0)
 		{
@@ -6162,13 +6158,38 @@ main(int argc, char **argv)
 			thread->thread = INVALID_THREAD;
 		}
 	}
-#else
-	INSTR_TIME_SET_CURRENT(threads[0].start_time);
-	/* compute when to stop */
+#endif							/* ENABLE_THREAD_SAFETY */
+
+#ifdef ENABLE_THREAD_SAFETY
+	/* wait till all threads started (threads wait in threadRun()) */
+	//fprintf(stderr, "andres: waiting for thread start: %u\n", threads[0].tid);
+	pthread_barrier_wait(&conn_barrier);
+#endif							/* ENABLE_THREAD_SAFETY */
+
+	/* get start up time */
+	INSTR_TIME_SET_CURRENT(start_time);
+
+	/* */
+	for (i = 0; i < nthreads; i++)
+	{
+		TState	   *thread = &threads[i];
+
+		thread->start_time = start_time;
+
+		/* compute when to stop */
+		if (duration > 0)
+			end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
+				(int64) 1000000 * duration;
+	}
+
+	/* set alarm if duration is specified. */
 	if (duration > 0)
-		end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
-			(int64) 1000000 * duration;
-	threads[0].thread = INVALID_THREAD;
+		setalarm(duration);
+
+#ifdef ENABLE_THREAD_SAFETY
+	/* updated start time (threads wait in threadRun()) */
+	//fprintf(stderr, "andres: %u: waiting for start time\n", threads[0].tid);
+	pthread_barrier_wait(&conn_barrier);
 #endif							/* ENABLE_THREAD_SAFETY */
 
 	/* wait for threads and accumulate results */
@@ -6236,12 +6257,30 @@ threadRun(void *arg)
 	int			i;
 
 	/* for reporting progress: */
-	int64		thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
-	int64		last_report = thread_start;
-	int64		next_report = last_report + (int64) progress * 1000000;
+	int64		thread_start;
+	int64		last_report;
+	int64		next_report;
 	StatsData	last,
 				aggs;
 
+	/* wait till all threads started (main waits outside) */
+	if (thread->tid != 0)
+	{
+		//fprintf(stderr, "andres: %u: waiting for thread start\n", thread->tid);
+		pthread_barrier_wait(&conn_barrier);
+	}
+
+	/* wait for start time to be initialized (main waits outside) */
+	if (thread->tid != 0)
+	{
+		//fprintf(stderr, "andres: %u: waiting for start time\n", thread->tid);
+		pthread_barrier_wait(&conn_barrier);
+	}
+
+	thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
+	last_report = thread_start;
+	next_report = last_report + (int64) progress * 1000000;
+
 	/*
 	 * Initialize throttling rate target for all of the thread's clients.  It
 	 * might be a little more accurate to reset thread->start_time here too.
@@ -6288,7 +6327,27 @@ threadRun(void *arg)
 
 	/* time after thread and connections set up */
 	INSTR_TIME_SET_CURRENT(thread->conn_time);
-	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
+	INSTR_TIME_SUBTRACT(thread->conn_time, start);
+
+	//	e = thread->conn_time;
+	//fprintf(stderr, "andres: %u: connection established in %f (s %f, e %f)\n",
+	//		thread->tid, INSTR_TIME_GET_DOUBLE(thread->conn_time),
+	//		INSTR_TIME_GET_DOUBLE(e),
+	//		INSTR_TIME_GET_DOUBLE(start));
+
+	/* add once for each other connection */
+	if (!is_connect)
+	{
+		instr_time e = thread->conn_time;
+		for (i = 0; i < (nstate - 1); i++)
+		{
+			INSTR_TIME_ADD(thread->conn_time, e);
+		}
+	}
+
+	/* wait for all connections to be established */
+	//fprintf(stderr, "andres: %u: waiting for connection establishment\n", thread->tid);
+	pthread_barrier_wait(&conn_barrier);
 
 	/* explicitly initialize the state machines */
 	for (i = 0; i < nstate; i++)
-- 
2.25.0.114.g5b0ca878e0

Reply via email to