*** pgbench.c	Wed Jul  8 13:56:57 2009
--- pgbench-mt.c	Wed Jul  8 14:18:25 2009
***************
*** 55,64 ****
  #include <sys/resource.h>		/* for getrlimit */
  #endif
  
  extern char *optarg;
  extern int	optind;
  
- 
  /********************************************************************
   * some configurable parameters */
  
--- 55,74 ----
  #include <sys/resource.h>		/* for getrlimit */
  #endif
  
+ #ifdef WIN32
+ typedef struct win32_pthread   *pthread_t;
+ typedef int						pthread_attr_t;
+ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg);
+ static int pthread_join(pthread_t th, void **thread_return);
+ #elif defined(ENABLE_THREAD_SAFETY)
+ #include <pthread.h>
+ #else
+ #define pthread_t	int
+ #endif
+ 
  extern char *optarg;
  extern int	optind;
  
  /********************************************************************
   * some configurable parameters */
  
***************
*** 71,77 ****
  
  #define DEFAULT_NXACTS	10		/* default nxacts */
  
- int			nclients = 1;		/* default number of simulated clients */
  int			nxacts = 0;			/* number of transactions per client */
  int			duration = 0;		/* duration in seconds */
  
--- 81,86 ----
***************
*** 99,106 ****
  
  bool		use_log;			/* log transaction latencies to a file */
  
- int			remains;			/* number of remaining clients */
- 
  int			is_connect;			/* establish connection  for each transaction */
  
  char	   *pghost = "";
--- 108,113 ----
***************
*** 144,149 ****
--- 151,168 ----
  } CState;
  
  /*
+  * Thread state
+  */
+ typedef struct
+ {
+ 	pthread_t		thread;		/* thread handle */
+ 	CState		   *state;		/* array of CState */
+ 	int				nstate;		/* length of state */
+ } TState;
+ 
+ #define INVALID_THREAD		((pthread_t) 0)
+ 
+ /*
   * queries read from files
   */
  #define SQL_COMMAND		1
***************
*** 168,175 ****
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! Command   **sql_files[MAX_FILES];		/* SQL script files */
! int			num_files;			/* number of script files */
  
  /* default scenario */
  static char *tpc_b = {
--- 187,195 ----
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! static Command	  **sql_files[MAX_FILES];	/* SQL script files */
! static int			num_files;				/* number of script files */
! static int			debug = 0;				/* debug flag */
  
  /* default scenario */
  static char *tpc_b = {
***************
*** 217,223 ****
  
  /* Function prototypes */
  static void setalarm(int seconds);
! 
  
  /* Calculate total time */
  static void
--- 237,243 ----
  
  /* Function prototypes */
  static void setalarm(int seconds);
! static void* threadRun(void *arg);
  
  /* Calculate total time */
  static void
***************
*** 376,404 ****
  	} while (res);
  }
  
- /* check to see if the SQL result was good */
- static int
- check(CState *state, PGresult *res, int n)
- {
- 	CState	   *st = &state[n];
- 
- 	switch (PQresultStatus(res))
- 	{
- 		case PGRES_COMMAND_OK:
- 		case PGRES_TUPLES_OK:
- 			/* OK */
- 			break;
- 		default:
- 			fprintf(stderr, "Client %d aborted in state %d: %s",
- 					n, st->state, PQerrorMessage(st->con));
- 			remains--;			/* I've aborted */
- 			PQfinish(st->con);
- 			st->con = NULL;
- 			return (-1);
- 	}
- 	return (0);					/* OK */
- }
- 
  static int
  compareVariables(const void *v1, const void *v2)
  {
--- 396,401 ----
***************
*** 595,605 ****
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static void
! doCustom(CState *state, int n, int debug)
  {
  	PGresult   *res;
- 	CState	   *st = &state[n];
  	Command   **commands;
  
  top:
--- 592,615 ----
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static bool
! clientDone(CState *st, bool ok)
! {
! 	(void) ok;	/* unused */
! 
! 	if (st->con != NULL)
! 	{
! 		PQfinish(st->con);
! 		st->con = NULL;
! 	}
! 	return false;	/* always false */
! }
! 
! /* return false iff client should be disconnected */
! static bool
! doCustom(CState *st)
  {
  	PGresult   *res;
  	Command   **commands;
  
  top:
***************
*** 616,622 ****
  		if (usec <= 0)
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return;				/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
--- 626,632 ----
  		if (usec <= 0)
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return true;		/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
***************
*** 624,640 ****
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", n);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
! 				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
! 				return;
  			}
  			if (PQisBusy(st->con))
! 				return;			/* don't have the whole result yet */
  		}
  
  		/*
--- 634,647 ----
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", st->id);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
! 				return clientDone(st, false);
  			}
  			if (PQisBusy(st->con))
! 				return true;	/* don't have the whole result yet */
  		}
  
  		/*
***************
*** 657,666 ****
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			if (check(state, res, n))
  			{
! 				PQclear(res);
! 				return;
  			}
  			PQclear(res);
  			discard_response(st);
--- 664,679 ----
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			switch (PQresultStatus(res))
  			{
! 				case PGRES_COMMAND_OK:
! 				case PGRES_TUPLES_OK:
! 					break;	/* OK */
! 				default:
! 					fprintf(stderr, "Client %d aborted in state %d: %s",
! 						st->id, st->state, PQerrorMessage(st->con));
! 					PQclear(res);
! 					return clientDone(st, false);
  			}
  			PQclear(res);
  			discard_response(st);
***************
*** 676,690 ****
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 			{
! 				remains--;		/* I've done */
! 				if (st->con != NULL)
! 				{
! 					PQfinish(st->con);
! 					st->con = NULL;
! 				}
! 				return;
! 			}
  		}
  
  		/* increment state counter */
--- 689,695 ----
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 				return clientDone(st, true);	/* exit success */
  		}
  
  		/* increment state counter */
***************
*** 706,717 ****
  		gettimeofday(&t1, NULL);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n",
! 					n);
! 			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
! 			return;
  		}
  		gettimeofday(&t2, NULL);
  		diffTime(&t2, &t1, &t3);
--- 711,718 ----
  		gettimeofday(&t1, NULL);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
! 			return clientDone(st, false);
  		}
  		gettimeofday(&t2, NULL);
  		diffTime(&t2, &t1, &t3);
***************
*** 735,745 ****
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
--- 736,746 ----
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return true;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
***************
*** 751,757 ****
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
--- 752,758 ----
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
***************
*** 785,791 ****
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
--- 786,792 ----
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
***************
*** 795,801 ****
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
  			st->ecnt++;
  		}
  		else
--- 796,802 ----
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
  			st->ecnt++;
  		}
  		else
***************
*** 809,815 ****
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", n, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
--- 810,816 ----
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
***************
*** 828,834 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				min = atoi(var);
  			}
--- 829,835 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				min = atoi(var);
  			}
***************
*** 850,856 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  				max = atoi(var);
  			}
--- 851,857 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  				max = atoi(var);
  			}
***************
*** 861,867 ****
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return;
  			}
  
  #ifdef DEBUG
--- 862,868 ----
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return true;
  			}
  
  #ifdef DEBUG
***************
*** 873,879 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 874,880 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
***************
*** 891,897 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				ope1 = atoi(var);
  			}
--- 892,898 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				ope1 = atoi(var);
  			}
***************
*** 908,914 ****
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return;
  					}
  					ope2 = atoi(var);
  				}
--- 909,915 ----
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return true;
  					}
  					ope2 = atoi(var);
  				}
***************
*** 927,933 ****
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
--- 928,934 ----
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return true;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
***************
*** 935,941 ****
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  			}
  
--- 936,942 ----
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  			}
  
***************
*** 943,949 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 944,950 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
***************
*** 960,966 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return;
  				}
  				usec = atoi(var);
  			}
--- 961,967 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return true;
  				}
  				usec = atoi(var);
  			}
***************
*** 987,1004 ****
  
  		goto top;
  	}
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state)
  {
  	int			i;
  
! 	for (i = 0; i < nclients; i++)
  	{
  		if (state[i].con)
  			PQfinish(state[i].con);
  	}
  }
  
--- 988,1010 ----
  
  		goto top;
  	}
+ 
+ 	return true;
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state, int length)
  {
  	int			i;
  
! 	for (i = 0; i < length; i++)
  	{
  		if (state[i].con)
+ 		{
  			PQfinish(state[i].con);
+ 			state[i].con = NULL;
+ 		}
  	}
  }
  
***************
*** 1450,1457 ****
  
  /* print out results */
  static void
! printResults(
! 			 int ttype, CState *state,
  			 struct timeval * start_time, struct timeval * end_time)
  {
  	double		t1,
--- 1456,1462 ----
  
  /* print out results */
  static void
! printResults(int ttype, const CState *state, int nclients, int nthreads,
  			 struct timeval * start_time, struct timeval * end_time)
  {
  	double		t1,
***************
*** 1483,1488 ****
--- 1488,1494 ----
  	printf("scaling factor: %d\n", scale);
  	printf("query mode: %s\n", QUERYMODE[querymode]);
  	printf("number of clients: %d\n", nclients);
+ 	printf("number of threads: %d\n", nthreads);
  	if (duration <= 0)
  	{
  		printf("number of transactions per client: %d\n", nxacts);
***************
*** 1504,1532 ****
  main(int argc, char **argv)
  {
  	int			c;
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
- 	int			debug = 0;		/* debug flag */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
  
  	struct timeval start_time;	/* start up time */
  	struct timeval end_time;	/* end time */
  
  	int			i;
  
- 	fd_set		input_mask;
- 	int			nsocks;			/* return from select(2) */
- 	int			maxsock;		/* max socket number to be waited */
- 	struct timeval now;
- 	struct timeval timeout;
- 	int			min_usec;
- 
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
--- 1510,1533 ----
  main(int argc, char **argv)
  {
  	int			c;
+ 	int			nclients = 1;		/* default number of simulated clients */
+ 	int			nthreads = 1;		/* default number of threads */
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
+ 	TState	   *threads;		/* array of thread */
  
  	struct timeval start_time;	/* start up time */
  	struct timeval end_time;	/* end time */
  
  	int			i;
  
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
***************
*** 1576,1582 ****
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
  	{
  		switch (c)
  		{
--- 1577,1583 ----
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
  	{
  		switch (c)
  		{
***************
*** 1629,1634 ****
--- 1630,1650 ----
  				}
  #endif   /* HAVE_GETRLIMIT */
  				break;
+ 			case 'j':	/* jobs */
+ 				nthreads = atoi(optarg);
+ 				if (nthreads <= 0)
+ 				{
+ 					fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+ 					exit(1);
+ 				}
+ #ifndef ENABLE_THREAD_SAFETY
+ 				if (nthreads > 1)
+ 				{
+ 					fprintf(stderr, "multi-threading is not supported\n");
+ 					exit(1);
+ 				}
+ #endif
+ 				break;
  			case 'C':
  				is_connect = 1;
  				break;
***************
*** 1749,1755 ****
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	remains = nclients;
  
  	if (nclients > 1)
  	{
--- 1765,1775 ----
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	if (nclients % nthreads != 0)
! 	{
! 		fprintf(stderr, "number of client (%d) must be multiple number of threads (%d)\n", nclients, nthreads);
! 		exit(1);
! 	}
  
  	if (nclients > 1)
  	{
***************
*** 1926,1974 ****
  			break;
  	}
  
  	/* send start up queries in async manner */
! 	for (i = 0; i < nclients; i++)
  	{
! 		Command   **commands = sql_files[state[i].use_file];
! 		int			prev_ecnt = state[i].ecnt;
  
! 		state[i].use_file = getrand(0, num_files - 1);
! 		doCustom(state, i, debug);
  
! 		if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
  			remains--;			/* I've aborted */
! 			PQfinish(state[i].con);
! 			state[i].con = NULL;
  		}
  	}
  
! 	for (;;)
  	{
! 		if (remains <= 0)
! 		{						/* all done ? */
! 			disconnect_all(state);
! 			/* get end time */
! 			gettimeofday(&end_time, NULL);
! 			printResults(ttype, state, &start_time, &end_time);
! 			if (LOGFILE)
! 				fclose(LOGFILE);
! 			exit(0);
! 		}
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
  		min_usec = -1;
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
  
! 			if (state[i].sleeping)
  			{
  				int			this_usec;
! 				int			sock = PQsocket(state[i].con);
  
  				if (min_usec < 0)
  				{
--- 1946,2048 ----
  			break;
  	}
  
+ 	/* start threads */
+ 	threads = (TState *) malloc(sizeof(TState) * nthreads);
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ #ifdef ENABLE_THREAD_SAFETY
+ 		/* the first thread (i = 0) is executed by main thread */
+ 		if (i > 0)
+ 		{
+ 			int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+ 			if (err != 0)
+ 			{
+ 				fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+ 				exit(1);
+ 			}
+ 		}
+ 		else
+ #endif
+ 		{
+ 			threads[i].thread = INVALID_THREAD;
+ 		}
+ 
+ 		threads[i].state = &state[nclients / nthreads * i];
+ 		threads[i].nstate = nclients / nthreads;
+ 	}
+ 
+ 	/* wait for threads */
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ 		if (threads[i].thread == INVALID_THREAD)
+ 			threadRun(&threads[i]);
+ #ifdef ENABLE_THREAD_SAFETY
+ 		else
+ 			pthread_join(threads[i].thread, NULL);
+ #endif
+ 	}
+ 	disconnect_all(state, nclients);
+ 
+ 	/* get end time */
+ 	gettimeofday(&end_time, NULL);
+ 	printResults(ttype, state, nclients, nthreads, &start_time, &end_time);
+ 	if (LOGFILE)
+ 		fclose(LOGFILE);
+ 
+ 	return 0;
+ }
+ 
+ static void*
+ threadRun(void *arg)
+ {
+ 	TState *thread = (TState *) arg;
+ 	CState *state = thread->state;
+ 	int		nstate = thread->nstate;
+ 	int		remains = nstate;	/* number of remaining clients */
+ 	int		i;
+ 
  	/* send start up queries in async manner */
! 	for (i = 0; i < nstate; i++)
  	{
! 		CState	   *st = &state[i];
! 		Command   **commands = sql_files[st->use_file];
! 		int			prev_ecnt = st->ecnt;
  
! 		st->use_file = getrand(0, num_files - 1);
! 		if (!doCustom(st))
! 			remains--;		/* I've aborted */
  
! 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
  			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
  		}
  	}
  
! 	while (remains > 0)
  	{
! 		fd_set			input_mask;
! 		int				nsocks;			/* return from select(2) */
! 		int				maxsock;		/* max socket number to be waited */
! 		struct timeval	now;
! 		struct timeval	timeout;
! 		int				min_usec;
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
  		min_usec = -1;
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
  
! 			if (st->sleeping)
  			{
  				int			this_usec;
! 				int			sock = PQsocket(st->con);
  
  				if (min_usec < 0)
  				{
***************
*** 1976,2002 ****
  					min_usec = 0;
  				}
  
! 				this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
! 					state[i].until.tv_usec - now.tv_usec;
  
  				if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
  					min_usec = this_usec;
  
! 				FD_SET		(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
  			}
! 			else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
  			{
! 				int			sock = PQsocket(state[i].con);
  
  				if (sock < 0)
  				{
! 					disconnect_all(state);
! 					exit(1);
  				}
! 				FD_SET		(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
--- 2050,2076 ----
  					min_usec = 0;
  				}
  
! 				this_usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
! 					st->until.tv_usec - now.tv_usec;
  
  				if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
  					min_usec = this_usec;
  
! 				FD_SET(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
  			}
! 			else if (st->con && commands[st->state]->type != META_COMMAND)
  			{
! 				int			sock = PQsocket(st->con);
  
  				if (sock < 0)
  				{
! 					disconnect_all(state, nstate);
! 					return NULL;
  				}
! 				FD_SET(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
***************
*** 2021,2065 ****
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
! 				disconnect_all(state);
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				exit(1);
! 			}
! #ifdef NOT_USED
! 			else if (nsocks == 0)
! 			{					/* timeout */
! 				fprintf(stderr, "select timeout\n");
! 				for (i = 0; i < nclients; i++)
! 				{
! 					fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
! 							i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
! 				}
! 				exit(0);
  			}
- #endif
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
! 			int			prev_ecnt = state[i].ecnt;
  
! 			if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
! 						  || commands[state[i].state]->type == META_COMMAND))
  			{
! 				doCustom(state, i, debug);
  			}
  
! 			if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
  				remains--;		/* I've aborted */
! 				PQfinish(state[i].con);
! 				state[i].con = NULL;
  			}
  		}
  	}
  }
  
  
--- 2095,2131 ----
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
! 				disconnect_all(state, nstate);
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				return NULL;
  			}
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
! 			int			prev_ecnt = st->ecnt;
  
! 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
! 						  || commands[st->state]->type == META_COMMAND))
  			{
! 				if (!doCustom(st))
! 					remains--;		/* I've aborted */
  			}
  
! 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
  				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
  			}
  		}
  	}
+ 
+ 	return NULL;
  }
  
  
***************
*** 2105,2110 ****
--- 2171,2242 ----
  		fprintf(stderr, "Failed to set timer\n");
  		exit(1);
  	}
+ }
+ 
+ /* partial pthread implementation for Windows */
+ 
+ typedef struct win32_pthread
+ {
+ 	HANDLE		handle;
+ 	void	   *(*routine)(void *);
+ 	void	   *arg;
+ 	void	   *result;
+ } win32_pthread;
+ 
+ static unsigned __stdcall
+ win32_pthread_run(void *arg)
+ {
+ 	win32_pthread *th = (win32_pthread *) arg;
+ 
+ 	th->result = th->routine(th->arg);
+ 
+ 	return 0;
+ }
+ 
+ static int
+ pthread_create(pthread_t *thread,
+ 			   pthread_attr_t *attr,
+ 			   void * (*start_routine)(void *),
+ 			   void * arg)
+ {
+ 	int				save_errno;
+ 	win32_pthread   *th;
+ 
+ 	th = (win32_pthread *) malloc(sizeof(win32_pthread));
+ 	th->routine = start_routine;
+ 	th->arg = arg;
+ 	th->result = NULL;
+ 
+ 	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+ 	if (th->handle == NULL)
+ 	{
+ 		save_errno = errno;
+ 		free(th);
+ 		return save_errno;
+ 	}
+ 
+ 	*thread = th;
+ 	return 0;
+ }
+ 
+ static int
+ pthread_join(pthread_t th, void **thread_return)
+ {
+ 	if (th == NULL || th->handle == NULL)
+ 		return errno = EINVAL;
+ 
+ 	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+ 	{
+ 		_dosmaperr(GetLastError());
+ 		return errno;
+ 	}
+ 
+ 	if (thread_return)
+ 		*thread_return = th->result;
+ 
+ 	CloseHandle(th->handle);
+ 	free(th);
+ 	return 0;
  }
  
  #endif   /* WIN32 */
