On Fri, Apr 15, 2016 at 8:25 PM, Etsuro Fujita
<fujita.ets...@lab.ntt.co.jp> wrote:
> How about doing something similar for PQprepare/PQexecPrepared in
> postgresExecForeignInsert, postgresExecForeignUpdate, and
> postgresExecForeignDelete?

Yes, I hesitated to touch those, but they are good candidates for this
new interface, and actually it has proved not to be complicated to
plug in the new routines in those code paths.

> Also, how about doing that for PQexec in connection.c?

Here I disagree, this is not adapted. All the PQexec calls are part of
callbacks that are triggered on failures, and we rely on such a
callback when issuing PQcancel. do_sql_command runs commands that take
a short amount of time, so I think as well that it is fine as-is with
PQexec.
-- 
Michael
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..64a00b4 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
@@ -448,6 +449,67 @@ GetPrepStmtNumber(PGconn *conn)
 }
 
 /*
+ * Execute query in non-blocking mode and fetch back result
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+	if (!PQsendQuery(conn, query))
+		pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+	return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Scan for results of query run in non-waiting mode
+ *
+ * This should be run after executing a query on the remote server and
+ * offers quick responsiveness by checking for any interruptions. The
+ * last result is returned back to caller, and previous results are
+ * consumed. Caller is responsible for the error handling on the fetched
+ * result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+	PGresult   *last_res;
+
+	for (;;)
+	{
+		PGresult *res;
+
+		while (PQisBusy(conn))
+		{
+			int		wc;
+
+			/* Sleep until there's something to do */
+			wc = WaitLatchOrSocket(MyLatch,
+								   WL_LATCH_SET | WL_SOCKET_READABLE,
+								   PQsocket(conn),
+								   -1L);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Data available in socket */
+			if (wc & WL_SOCKET_READABLE)
+			{
+				if (!PQconsumeInput(conn))
+					pgfdw_report_error(ERROR, NULL, conn, false, query);
+			}
+		}
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+			break;
+
+		last_res = res;
+	}
+
+	return last_res;
+}
+
+/*
  * Report an error we got from the remote server.
  *
  * elevel: error level to use (typically ERROR, but might be less)
@@ -598,6 +660,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
+
+					/*
+					 * If a command has been submitted to the remote server
+					 * using an asynchronous execution function, the command
+					 * might not have yet completed.  Check to see if a command
+					 * is still being processed by the remote server, and if so,
+					 * request cancellation of the command; if not, abort
+					 * gracefully.
+					 */
+					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+					{
+						PGcancel   *cancel;
+						char		errbuf[256];
+
+						if ((cancel = PQgetCancel(entry->conn)))
+						{
+							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+								ereport(WARNING,
+										(errcode(ERRCODE_CONNECTION_FAILURE),
+										 errmsg("could not send cancel request: %s",
+												errbuf)));
+							PQfreeCancel(cancel);
+						}
+						break;
+					}
+
 					/* If we're aborting, abort all remote transactions too */
 					res = PQexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index ee0220a..4566cf4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1754,13 +1754,16 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1824,13 +1827,16 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);;
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1894,13 +1900,16 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1950,7 +1959,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -2712,7 +2721,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -2821,8 +2830,11 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	if (!PQsendQueryParams(conn, buf.data, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+	res = pgfdw_get_result(conn, buf.data);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2868,7 +2880,7 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +2990,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = pgfdw_exec_query(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -3010,12 +3022,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
-
+	if (!PQsendPrepare(fmstate->conn,
+					   p_name,
+					   fmstate->query,
+					   0,
+					   NULL))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 	PQclear(res);
@@ -3151,8 +3165,12 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
-								   numParams, NULL, values, NULL, NULL, 0);
+	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3373,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -3449,7 +3467,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -3500,7 +3518,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = pgfdw_exec_query(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3693,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -3774,7 +3792,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 3a11d99..574b07d 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
+extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to