On 2016/04/11 12:30, Michael Paquier wrote:
+ if ((cancel = PQgetCancel(entry->conn)))
+ {
+ PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ }
Wouldn't it be better to issue a WARNING here if PQcancel does not succeed?
Seems like a good idea. Attached is an updated version of the patch.
Thanks for the review!
Best regards,
Etsuro Fujita
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..da1758b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -598,6 +598,34 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
+ /*
+ * If we had submitted a command to the remote server using
+ * an asynchronous execution function, the command might
+ * have not yet completed. Check to see if a command is
+ * still being processed by the remote server, and if so,
+ * request cancellation of the command; if not, abort
+ * gracefully.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ PGcancel *cancel;
+ char errbuf[256];
+
+ if ((cancel = PQgetCancel(entry->conn)))
+ {
+ /*
+ * Note: can't throw ERROR, it would be infinite
+ * loop
+ */
+ if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send cancel request: %s",
+ errbuf)));
+ PQfreeCancel(cancel);
+ }
+ break;
+ }
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index ee0220a..0378f1d 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -32,6 +32,7 @@
#include "optimizer/var.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
@@ -3131,6 +3132,7 @@ execute_dml_stmt(ForeignScanState *node)
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = dmstate->numParams;
const char **values = dmstate->param_values;
+ int wc;
/*
* Construct array of query parameter values in text format.
@@ -3147,12 +3149,42 @@ execute_dml_stmt(ForeignScanState *node)
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+ /*
+ * Receive data until PQgetResult is ready to get the result without
+ * blocking.
+ */
+ while (PQisBusy(dmstate->conn))
+ {
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_SOCKET_READABLE,
+ PQsocket(dmstate->conn),
+ -1L);
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Data available in socket */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(dmstate->conn))
+ pgfdw_report_error(ERROR, NULL, dmstate->conn, false,
+ dmstate->query);
+ }
+ }
+
+ /*
+ * Get the result
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
- numParams, NULL, values, NULL, NULL, 0);
+ dmstate->result = PQgetResult(dmstate->conn);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers