Hello, thank you for the comment.

This is the second version of the patch.

- Refactored to make the code simpler and clearer.
- Added comment about logic outline and struct members.
- Removed trailig white spaces..

- No additional test yet.


======
> warning: 3 lines add whitespace errors.

Whoops. Fixed.

> 2. The patches compile cleanly.
> 3. The regression is clean, even in contrib/postgres_fdw and
> contrib/file_fdw
> 
> Tests
> -------
> We need tests to make sure that the logic remains intact even after further
> changes in this area. Couple of tests which require multiple foreign scans
> within the same query fetching rows more than fetch size (100) would be
> required. Also, some DMLs, which involve multiple foreign scans would test
> the sanity when UPDATE/DELETE interleave such scans. By multiple foreign
> scans I mean both multiple scans on a single foreign server and multiple
> scans spread across multiple foreign servers.

Additional tests indeed might be needed. Some of the test related
to this patch are implicitly done in the present regression
tests. But no explicit ones.

fetch_size is currently a bare constant so I think it is not so
necessary to test for other fetch sizes. Even if different size
will potentially cause a problem, it will be found when the
different number is actually applied.

On the current design, async scan is started only on the first
scan on the connection, and if the next scan or modify claims the
same connection, the async state is immediately finished and
behaves as the same as the unpatched version. But since
asynchronous/parallel scan is introduced in any form, such kind
of test seems to be needed.

multi-server tests are not done also in the unpatched version but
there's no difference between multiple foregn servers on the same
remote server and them distributed on multiple remotes. The async
scan of this patch works only on the same foreign server so there
seems to be no need such kind of test. Do you have any specific
concern about this?

After all, I will add some explict tests for async-canceling in
the next patch.

> Code
> -------
> Because previous "conn" is now replaced by "conn->pgconn", the double
> indirection makes the code a bit ugly and prone to segfaults (conn being
> NULL or invalid pointer). Can we minimize such code or wrap it under a
> macro?

Agreed. It was annoyance also for me. I've done the following
things to encapsulate PgFdwConn to some extent in the second
version of this patch. They are described below.

> We need some comments about the structure definition of PgFdwConn and its
> members explaining the purpose of this structure and its members.

Thank you for pointing that. I forgot that. I added simple
comments there.

> Same is the case with enum PgFdwConnState. In fact, the state diagram of a
> connection has become more complicated with the async connections, so it
> might be better to explain that state diagram at one place in the code
> (through comments). The definition of the enum might be a good place to do
> that.

I added a comment describing the and logic and meaning of the
statesjust above the enum declaration.

> Otherwise, the logic of connection maintenance is spread at multiple
> places and is difficult to understand by looking at the code.
> 
> In function GetConnection(), at line
>         elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
> -            entry->conn, server->servername);
> +            entry->conn->pgconn, server->servername);

Thank you, I replaced conn's in this form with PFC_PGCONN(conn).

> entry->conn->pgconn may not necessarily be a new connection and may be NULL
> (as the next line check it for being NULL). So, I think this line should be
> moved within the following if block after pgconn has been initialised with
> the new connection.
> +   if (entry->conn->pgconn == NULL)
> +   {
> +       entry->conn->pgconn = connect_pg_server(server, user);
> +       entry->conn->nscans = 0;
> +       entry->conn->async_state = PGFDW_CONN_IDLE;
> +       entry->conn->async_scan = NULL;
> +   }
> 
> The if condition if (entry->conn == NULL) in GetConnection(), used to track
> whether there is a PGConn active for the given entry, now it tracks whether
> it has PgFdwConn for the same.

After some soncideration, I decided to make PgFdwConn to be
handled more similarly to PGconn. This patch has shrunk as a
result and bacame looks clear.

- Added macros to encapsulate PgFdwConn struct. (One of them is a function)

- Added macros to call PQxxx functions taking PgFdwConn.

- connect_pg_server() returns PgFdwConn.

- connection.c does not touch the inside of PgFdwConn except a
  few points. The PgFdwConn's memory is allocated with malloc()
  as PGconn and freed by PFCfinish() which is the correspondent
  of PQfinish().

As the result of those chagnes, this patch has altered into the
following shape.

- All points where PGconn is used now uses PgFdwConn. They are
  seemingly simple replacements.

- The major functional changes are concentrated within
  fetch_more_data(), postgreBeginForeignScan(), GetConnection() ,
  ReleaseConnection(), and the additional function
  finish_async_connection().

> Please see more comments inline.

> > * Outline of this patch
> >
> > From some consideration after the previous discussion and
> > comments from others, I judged the original (WIP) patch was
> > overdone as the first step. So I reduced the patch to minimal
> > function. The new patch does the following,
> >
> > - Wrapping PGconn by PgFdwConn in order to handle multiple scans
> >   on one connection.
> >
> > - The core async logic was added in fetch_more_data().
> >
> 
> It might help if you can explain this logic in this mail as well as in code
> (as per my comment above).

I wrote the outline of the logic in the comment for enum
PgFdwConnState in postgres_fdw.h. Is it make sense?

> > * Where this patch will be effective.
> >
> > With upcoming inheritance-partition feature, this patch enables
> > stating and running foreign scans asynchronously. It will be more
> > effective for longer TAT or remote startup times, and larger
> > number of foreign servers. No negative performance effect on
> > other situations.
> >
> >
> AFAIU, this logic sends only the first query in asynchronous manner not all
> of them. Is that right? If yes, I think it's a sever limitation of the
> feature. For a query involving multiple foreign scans, only the first one
> will be done in async fashion and not the rest. Sorry, if my understanding
> is wrong.

You're right for the first point. So the domain I think this is
effective is the case of sharding. Each remote server can have
dedicate PGconn connection in the case. Addition to it, the
ongoing FDW Join pushdown should increase the chance for async
execution in this manner. I found that It is difficult to find
the appropriate policy for managing the load on the remote server
when multiple PGconn connection for single remote server, so it
would be the next issue.

> I think, we need some data which shows the speed up by this patch. You may
> construct a case, where a single query involved multiple foreign scans, and
> we can check what is the speed up obtained against the number of foreign
> scans.

Agreed, I'll show you some such figures afterwards.

> > * Concerns about this patch.
> >
> > - This breaks the assumption that scan starts at ExecForeignScan,
> >   not ExecInitForeignScan, which might cause some problem.
> >
> >
> This should be fine as long as it doesn't have any side effects like
> sending query during EXPLAIN (which has been taken care of in this patch.)
> Do you think, we need any special handling for PREPAREd statements?

I suppose there's no difference between PREAPREd and
not-PREPAREDd at the level of FDW.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From a04a2f8ff32cf3095f7769eedde11ca946f024e5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 28 Nov 2014 10:52:41 +0900
Subject: [PATCH] Asynchronous execution of postgres_fdw v2

This is the modified version of Asynchronous execution of
postgres_fdw.

- Refactored to make the code simpler.
- Added comment about logic outline and struct members.
---
 contrib/postgres_fdw/connection.c   |  84 ++++++------
 contrib/postgres_fdw/postgres_fdw.c | 255 +++++++++++++++++++++++++++---------
 contrib/postgres_fdw/postgres_fdw.h |  84 +++++++++++-
 3 files changed, 318 insertions(+), 105 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..574b08e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -161,7 +161,7 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 PFC_PGCONN(entry->conn), server->servername);
 	}
 
 	/*
@@ -169,6 +169,13 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	 */
 	begin_remote_xact(entry);
 
+	/*
+	 * Finish async query immediately if another foreign scan node sharing
+	 * this connection comes.
+	 */
+	if (++entry->conn->nscans > 1 && PFC_IS_ASYNC_RUNNING(entry->conn))
+		fetch_more_data(entry->conn->async_scan);
+
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
@@ -178,10 +185,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
-static PGconn *
+static PgFdwConn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PgFdwConn   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +230,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify connection parameters and make connection */
 		check_conn_params(keywords, values);
 
-		conn = PQconnectdbParams(keywords, values, false);
-		if (!conn || PQstatus(conn) != CONNECTION_OK)
+		conn = PFCconnectdbParams(keywords, values, false);
+		if (!conn || PFCstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
-			connmessage = pstrdup(PQerrorMessage(conn));
+			connmessage = pstrdup(PFCerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
@@ -246,7 +253,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
-		if (!superuser() && !PQconnectionUsedPassword(conn))
+		if (!superuser() && !PFCconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 				   errmsg("password is required"),
@@ -263,7 +270,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	{
 		/* Release PGconn data structure if we managed to create one */
 		if (conn)
-			PQfinish(conn);
+			PFCfinish(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -312,9 +319,9 @@ check_conn_params(const char **keywords, const char **values)
  * there are any number of ways to break things.
  */
 static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
 {
-	int			remoteversion = PQserverVersion(conn);
+	int			remoteversion = PFCserverVersion(conn);
 
 	/* Force the search path to contain only pg_catalog (see deparse.c) */
 	do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +355,11 @@ configure_remote_session(PGconn *conn)
  * Convenience subroutine to issue a non-data-returning SQL command to remote
  */
 static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -379,7 +386,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 PFC_PGCONN(entry->conn));
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +415,11 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* ongoing async query should be canceled if no scans left */
+	if (--PFC_NSCANS(conn) == 0)
+		finish_async_connection(conn);
 }
 
 /*
@@ -429,7 +434,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -443,7 +448,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -462,7 +467,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +495,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PFCerrorMessage(conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -542,7 +547,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 PFC_PGCONN(entry->conn));
 
 			switch (event)
 			{
@@ -567,7 +572,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PFCexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -597,7 +602,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PFCexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -608,7 +613,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PFCexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -620,17 +625,17 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
+		PFC_RESET(entry->conn);
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PFCstatus(entry->conn) != CONNECTION_OK ||
+			PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p", PFC_PGCONN(entry->conn));
+			PFCfinish(entry->conn);
 		}
 	}
 
@@ -676,6 +681,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		PFC_RESET(entry->conn);
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -701,7 +709,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PFCexec(entry->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..552b0d4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -136,7 +136,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -156,6 +156,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -167,7 +168,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -298,7 +299,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -306,9 +307,8 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -329,6 +329,18 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
+/* wrapper functions for libpq functions */
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+				   const char *const * values, int expand_dbname)
+{
+	PgFdwConn *ret = PFC_ALLOCATE();
+
+	PFC_INIT(ret);
+	PFC_PGCONN(ret) = PQconnectdbParams(keywords, values, expand_dbname);
+
+	return ret;
+}
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -982,6 +994,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	/*
+	 * Start scanning asynchronously if it is the first scan on this
+	 * connection.
+	 */
+	if (PFC_NSCANS(fsstate->conn) == 1)
+		create_cursor(fsstate);
 }
 
 /*
@@ -1000,7 +1021,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+		create_cursor(fsstate);
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1009,7 +1030,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1069,7 +1090,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PFCexec(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1398,13 +1419,13 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1468,13 +1489,13 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1538,13 +1559,13 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1594,7 +1615,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PFCexec(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1726,7 +1747,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1836,7 +1857,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1852,7 +1873,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PFCexec(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1917,13 +1938,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn	*conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -1985,8 +2005,8 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+						 NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2001,15 +2021,18 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/* Start async scan if this is the first scan */
+	if (PFC_NSCANS(conn) == 1)
+		fetch_more_data(fsstate);
 }
 
 /*
  * Fetch some more rows from the node's cursor.
  */
-static void
-fetch_more_data(ForeignScanState *node)
+void
+fetch_more_data(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
@@ -2024,7 +2047,7 @@ fetch_more_data(ForeignScanState *node)
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
 		int			numrows;
@@ -2036,9 +2059,64 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		switch (conn->async_state)
+		{
+		case PGFDW_CONN_IDLE:
+			Assert(conn->async_scan == NULL);
+
+			/* Do async fetch only when only one scan uses this connection */
+			if (conn->nscans == 1)
+			{
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+
+				conn->async_state = PGFDW_CONN_ASYNC_RUNNING;
+				conn->async_scan = fsstate;
+				goto end_of_fetch;
+			}
+
+			/* Do synchronous query execution */
+			conn->async_state = PGFDW_CONN_SYNC_RUNNING;
+			res = PFCexec(conn, sql);
+			break;
+
+		case PGFDW_CONN_ASYNC_RUNNING:
+			Assert(conn->async_scan != NULL);
+
+			res = PFCgetResult(conn);
+			if (PQntuples(res) == fetch_size)
+			{
+				/*
+				 * Connection state doesn't go to IDLE even if all data
+				 * has been sent to client for asynchronous query. One
+				 * more PQgetResult() is needed to reset the state to
+				 * IDLE.  See PQexecFinish() for details.
+				 */
+				if (PFCgetResult(conn) != NULL)
+					elog(ERROR, "Connection status error.");
+			}
+
+			if (conn->nscans == 1)
+				break;
+
+			/*
+			 * If nscans is more then 1, stop invoking command asynchronously
+			 * for multiple scans on this connection. If nscan is zero, async
+			 * command on this connection should be finished immediately.
+			 */
+			conn->async_state = PGFDW_CONN_SYNC_RUNNING;
+			conn->async_scan = NULL;
+			break;
+
+		default:
+			elog(ERROR, "unexpected async state : %d", conn->async_state);
+			break;
+
+		}
+
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
@@ -2066,6 +2144,33 @@ fetch_more_data(ForeignScanState *node)
 
 		PQclear(res);
 		res = NULL;
+
+		switch(conn->async_state)
+		{
+		case PGFDW_CONN_ASYNC_RUNNING:
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+				break;
+			}
+
+			/* Fall through */
+		case PGFDW_CONN_SYNC_RUNNING:
+			PFC_SET_IDLE(conn);
+			break;
+
+		default:
+			elog(ERROR, "Unexpedted async state: %d", conn->async_state);
+			break;
+		}
+
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2079,6 +2184,32 @@ fetch_more_data(ForeignScanState *node)
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_connection(PgFdwConn *conn)
+{
+	PgFdwScanState *fsstate = conn->async_scan;
+	PgFdwConn *async_conn;
+
+	/* Nothing to do if no async connection */
+	if (fsstate == NULL) return;
+	async_conn = fsstate->conn;
+	Assert(async_conn);
+
+	/* Finish async command if any */
+	if (PFC_IS_ASYNC_RUNNING(async_conn))
+		fetch_more_data(async_conn->async_scan);
+
+	Assert(async_conn->async_state == PGFDW_CONN_IDLE &&
+		   async_conn->async_scan == NULL);
+
+	/* Immediately discard the result */
+	fsstate->next_tuple = 0;
+	fsstate->num_tuples = 0;
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2132,7 +2263,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2143,7 +2274,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2175,11 +2306,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
+	res = PFCprepare(fmstate->conn,
+					 p_name,
+					 fmstate->query,
+					 0,
+					 NULL);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2297,7 +2428,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2329,7 +2460,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2379,7 +2510,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2423,7 +2554,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2453,7 +2584,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PFCexec(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2713,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2615,7 +2746,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PFCserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2628,7 +2759,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2723,7 +2854,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..2c81189 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -20,17 +20,91 @@
 
 #include "libpq-fe.h"
 
+/*
+ * PgFdwConnState - states of PgFdwConn
+ *
+ * PgFdwConn manages asynchronous query execution status on a PGconn
+ * connection.  Since one PGconn cannot accept multiple asynchronous queries
+ * at once, the ongoing async query is immediately finished by another claim
+ * of the PgFdwConn to use. This state transition is represented using the
+ * enumeration PgFdwConnState and mainly made within fetch_more_data().
+ *
+ * PGFDW_CONN_ASYNC_RUNNING is the state to be entered when calling
+ * fetch_more_data() on the PgFdwConn in IDLE state and used only one
+ * scan. Being called on a PgFdwConn of the state, fetch_more_data() sends the
+ * next FETCH request after getting the result of the previous request.
+ *
+ * PGFDW_CONN_SYNC_RUNNING is rather an internal state in
+ * fetch_more_data(). It indicates that the function shouldn't send the next
+ * fetch requst after getting the result.
+ */
+typedef enum PgFdwConnState
+{
+	PGFDW_CONN_IDLE,			/* running no query */
+	PGFDW_CONN_ASYNC_RUNNING,	/* running a query asynchronously */
+	PGFDW_CONN_SYNC_RUNNING		/* running a query synchronously  */
+} PgFdwConnState;
+
+typedef struct PgFdwConn
+{
+	PGconn *pgconn;				/* libpq connection for this connection */
+	int		nscans;				/* number of scans using this connection */
+	PgFdwConnState	async_state;/* query running state */
+	struct PgFdwScanState *async_scan; /* the connection currently running
+										* async query on this connection  */
+} PgFdwConn;
+
+/* Macros to operate PgFdwConn */
+#define PFC_IS_ASYNC_RUNNING(c) ((c)->async_state == PGFDW_CONN_ASYNC_RUNNING)
+#define PFC_PGCONN(c)	((c)->pgconn)
+#define PFC_NSCANS(c)	((c)->nscans)
+#define PFC_SET_IDLE(c)	((c)->async_scan = NULL, \
+						 (c)->async_state = PGFDW_CONN_IDLE)
+#define PFC_RESET(c) \
+	((PFC_IS_ASYNC_RUNNING(c) ? PFCconsumeInput(c):0),	\
+	 PFC_SET_IDLE(c), PFC_NSCANS(c) = 0)
+#define PFC_INIT(c)		(PFC_NSCANS(c) = 0, PFC_SET_IDLE(c))
+
+#define PFC_ALLOCATE()	((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c)		free(c)
+
+/* libpq wrappers to take PgFdwConn* instead of PGconn* */
+#define PFCsendQuery(c,q) PQsendQuery((c)->pgconn, (q))
+#define PFCexec(c, q) PQexec((c)->pgconn, (q))
+#define PFCexecParams(c, q, n, t, v, l, f, rf)			\
+	PQexecParams((c)->pgconn,(q),(n),(t),(v),(l),(f),(rf))
+#define PFCprepare(c, sn, q, n, t) PQprepare((c)->pgconn,(sn),(q),(n),(t))
+#define PFCexecPrepared(c, sn, n, v, l, f, rf) \
+	PQexecPrepared((c)->pgconn,(sn),(n),(v),(l),(f),(rf))
+#define PFCgetResult(c) PQgetResult((c)->pgconn)
+#define PFCconsumeInput(c) PQconsumeInput((c)->pgconn)
+#define PFCisBusy(c) PQisBusy((c)->pgconn)
+#define PFCstatus(c) PQstatus((c)->pgconn)
+#define PFCtransactionStatus(c) PQtransactionStatus((c)->pgconn)
+#define PFCserverVersion(c) PQserverVersion((c)->pgconn)
+#define PFCerrorMessage(c) PQerrorMessage((c)->pgconn)
+#define PFCconnectionUsedPassword(c) PQconnectionUsedPassword((c)->pgconn)
+
+/* These are not simple wrappers of PQfinish */
+#define PFCfinish(c) (PQfinish((c)->pgconn), PFC_FREE(c))
+
+/* libpq wrapper functions */
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+			 const char *const * values, int expand_dbname);
+
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void fetch_more_data(struct PgFdwScanState *node);
+extern void finish_async_connection(PgFdwConn *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
2.1.0.GIT

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to