I revised the patch so that async scan will be done more
aggressively, and took execution time for two very simple cases.
As the result, simple seq scan gained 5% and hash join of two
foreign tables gained 150%. (2.4 times faster).
While measuring the performance, I noticed that each scan in a
query runs at once rather than alternating with each other in
many cases such as hash join or sorted joins and so. So I
modified the patch so that async fetch is done more
aggressively. The new v4 patch is attached. The following numbers
are taken based on it.
========
Simple seq scan for the first test.
> CREATE TABLE lt1 (a int, b timestamp, c text);
> CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host
> 'localhost');
> CREATE USER MAPPING FOR PUBLIC SERVER sv1;
> CREATE FOREIGN TABLE ft1 () SERVER sv1 OPTIONS (table_name 'lt1');
> INSERT INTO lt1 (SELECT a, now(), repeat('x', 128) FROM generate_series(0,
> 999999) a);
On this case, I took the the 10 times average of exec time of the
following query for both master head and patched version. The
fetch size is 100.
> postgres=# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT * FROM ft1;
> QUERY PLAN
> ------------------------------------------------------------------
> Foreign Scan on ft1 (actual time=0.79 5..4175.706 rows=1000000 loops=1)
> Planning time: 0.060 ms
> Execution time: 4276.043 ms
master head : avg = 4256.621, std dev = 17.099
patched pgfdw: avg = 4036.463, std dev = 2.608
The patched version is faster by about 5%. This should be pure
result of asynchronous fetching, not including the effect of
early starting of remote execution in ExecInit.
Interestingly, as fetch_count gets larger, the gain raises in
spite of the decrease of the number of query sending.
master head : avg = 2622.759, std dev = 38.379
patched pgfdw: avg = 2277.622, std dev = 27.269
About 15% gain. And for 10000,
master head : avg = 2000.980, std dev = 6.434
patched pgfdw: avg = 1616.793, std dev = 13.192
19%.. It is natural that exec time reduces along with increase of
fetch size, but I haven't found the reason why the patch's gain
also increases.
======================
The second case is a simple join of two foreign tables sharing
one connection.
The master head runs this query in about 16 seconds with almost
no fluctuation among multiple tries.
> =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c
> FROM ft1 AS x JOIN ft1 AS y on x.a = y.a;
> QUERY PLAN
> ----------------------------------------------------------------------------
> Hash Join (actual time=7541.831..15924.631 rows=1000000 loops=1)
> Hash Cond: (x.a = y.a)
> -> Foreign Scan on ft1 x (actual time=1.176..6553.480 rows=1000000 loops=1)
> -> Hash (actual time=7539.761..7539.761 rows=1000000 loops=1)
> Buckets: 32768 Batches: 64 Memory Usage: 2829kB
> -> Foreign Scan on ft1 y (actual time=1.067..6529.165 rows=1000000
> loops=1)
> Planning time: 0.223 ms
> Execution time: 15973.916 ms
But the v4 patch mysteriously accelerates this query, 6.5 seconds.
> =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c
> FROM ft1 AS x JOIN ft1 AS y on x.a = y.a;
> QUERY PLAN
> ----------------------------------------------------------------------------
> Hash Join (actual time=2556.977..5812.937 rows=1000000 loops=1)
> Hash Cond: (x.a = y.a)
> -> Foreign Scan on ft1 x (actual time=32.689..1936.565 rows=1000000
> loops=1)
> -> Hash (actual time=2523.810..2523.810 rows=1000000 loops=1)
> Buckets: 32768 Batches: 64 Memory Usage: 2829kB
> -> Foreign Scan on ft1 y (actual time=50.345..1928.411 rows=1000000
> loops=1)
> Planning time: 0.220 ms
> Execution time: 6512.043 ms
The result data seems not broken. I don't know the reason yet but
I'll investigate it.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
>From edba0530fb6a9c5a4e6def055757d6d60bce9171 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <[email protected]>
Date: Tue, 13 Jan 2015 19:20:35 +0900
Subject: [PATCH] Asynchronous execution of postgres_fdw v4
This is the modified version of Asynchronous execution of
postgres_fdw.
- Do async fetch more aggressively than v3.
- No additional tests yet :(
---
contrib/postgres_fdw/Makefile | 2 +-
contrib/postgres_fdw/PgFdwConn.c | 200 +++++++++++++++++++++++++
contrib/postgres_fdw/PgFdwConn.h | 61 ++++++++
contrib/postgres_fdw/connection.c | 82 ++++++-----
contrib/postgres_fdw/postgres_fdw.c | 283 +++++++++++++++++++++++++++---------
contrib/postgres_fdw/postgres_fdw.h | 15 +-
6 files changed, 527 insertions(+), 116 deletions(-)
create mode 100644 contrib/postgres_fdw/PgFdwConn.c
create mode 100644 contrib/postgres_fdw/PgFdwConn.h
diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index d2b98e1..d0913e2 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -1,7 +1,7 @@
# contrib/postgres_fdw/Makefile
MODULE_big = postgres_fdw
-OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES)
+OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES)
PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL"
PG_CPPFLAGS = -I$(libpq_srcdir)
diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c
new file mode 100644
index 0000000..b13b597
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.c
@@ -0,0 +1,200 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.c
+ * PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/postgres_fdw/PgFdwConn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "PgFdwConn.h"
+
+#define PFC_ALLOCATE() ((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c) free(c)
+
+struct pgfdw_conn
+{
+ PGconn *pgconn; /* libpq connection for this connection */
+ int nscans; /* number of scans using this connection */
+ struct PgFdwScanState *async_scan; /* the connection currently running
+ * async query on this connection */
+};
+
+void
+PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan)
+{
+ conn->async_scan = scan;
+}
+
+struct PgFdwScanState *
+PFCgetAsyncScan(PgFdwConn *conn)
+{
+ return conn->async_scan;
+}
+
+int
+PFCisAsyncRunning(PgFdwConn *conn)
+{
+ return conn->async_scan != NULL;
+}
+
+PGconn *
+PFCgetPGconn(PgFdwConn *conn)
+{
+ return conn->pgconn;
+}
+
+int
+PFCgetNscans(PgFdwConn *conn)
+{
+ return conn->nscans;
+}
+
+int
+PFCincrementNscans(PgFdwConn *conn)
+{
+ return ++conn->nscans;
+}
+
+int
+PFCdecrementNscans(PgFdwConn *conn)
+{
+ Assert(conn->nscans > 0);
+ return --conn->nscans;
+}
+
+void
+PFCcancelAsync(PgFdwConn *conn)
+{
+ if (PFCisAsyncRunning(conn))
+ PFCconsumeInput(conn);
+}
+
+void
+PFCinit(PgFdwConn *conn)
+{
+ conn->async_scan = NULL;
+ conn->nscans = 0;
+}
+
+int
+PFCsendQuery(PgFdwConn *conn, const char *query)
+{
+ return PQsendQuery(conn->pgconn, query);
+}
+
+PGresult *
+PFCexec(PgFdwConn *conn, const char *query)
+{
+ return PQexec(conn->pgconn, query);
+}
+
+PGresult *
+PFCexecParams(PgFdwConn *conn,
+ const char *command,
+ int nParams,
+ const Oid *paramTypes,
+ const char *const * paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat)
+{
+ return PQexecParams(conn->pgconn,
+ command, nParams, paramTypes, paramValues,
+ paramLengths, paramFormats, resultFormat);
+}
+
+PGresult *
+PFCprepare(PgFdwConn *conn,
+ const char *stmtName, const char *query,
+ int nParams, const Oid *paramTypes)
+{
+ return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes);
+}
+
+PGresult *
+PFCexecPrepared(PgFdwConn *conn,
+ const char *stmtName,
+ int nParams,
+ const char *const * paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat)
+{
+ return PQexecPrepared(conn->pgconn,
+ stmtName, nParams, paramValues, paramLengths,
+ paramFormats, resultFormat);
+}
+
+PGresult *
+PFCgetResult(PgFdwConn *conn)
+{
+ return PQgetResult(conn->pgconn);
+}
+
+int
+PFCconsumeInput(PgFdwConn *conn)
+{
+ return PQconsumeInput(conn->pgconn);
+}
+
+int
+PFCisBusy(PgFdwConn *conn)
+{
+ return PQisBusy(conn->pgconn);
+}
+
+ConnStatusType
+PFCstatus(const PgFdwConn *conn)
+{
+ return PQstatus(conn->pgconn);
+}
+
+PGTransactionStatusType
+PFCtransactionStatus(const PgFdwConn *conn)
+{
+ return PQtransactionStatus(conn->pgconn);
+}
+
+int
+PFCserverVersion(const PgFdwConn *conn)
+{
+ return PQserverVersion(conn->pgconn);
+}
+
+char *
+PFCerrorMessage(const PgFdwConn *conn)
+{
+ return PQerrorMessage(conn->pgconn);
+}
+
+int
+PFCconnectionUsedPassword(const PgFdwConn *conn)
+{
+ return PQconnectionUsedPassword(conn->pgconn);
+}
+
+void
+PFCfinish(PgFdwConn *conn)
+{
+ return PQfinish(conn->pgconn);
+ PFC_FREE(conn);
+}
+
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+ const char *const * values, int expand_dbname)
+{
+ PgFdwConn *ret = PFC_ALLOCATE();
+
+ PFCinit(ret);
+ ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname);
+
+ return ret;
+}
diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h
new file mode 100644
index 0000000..f695f5a
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.h
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.h
+ * PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/postgres_fdw/PgFdwConn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGFDWCONN_H
+#define PGFDWCONN_H
+
+#include "libpq-fe.h"
+
+typedef struct pgfdw_conn PgFdwConn;
+struct PgFdwScanState;
+
+extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan);
+extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn);
+extern int PFCisAsyncRunning(PgFdwConn *conn);
+extern PGconn *PFCgetPGconn(PgFdwConn *conn);
+extern int PFCgetNscans(PgFdwConn *conn);
+extern int PFCincrementNscans(PgFdwConn *conn);
+extern int PFCdecrementNscans(PgFdwConn *conn);
+extern void PFCcancelAsync(PgFdwConn *conn);
+extern void PFCinit(PgFdwConn *conn);
+extern int PFCsendQuery(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexec(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexecParams(PgFdwConn *conn,
+ const char *command,
+ int nParams,
+ const Oid *paramTypes,
+ const char *const * paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat);
+extern PGresult *PFCprepare(PgFdwConn *conn,
+ const char *stmtName, const char *query,
+ int nParams, const Oid *paramTypes);
+extern PGresult *PFCexecPrepared(PgFdwConn *conn,
+ const char *stmtName,
+ int nParams,
+ const char *const * paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat);
+extern PGresult *PFCgetResult(PgFdwConn *conn);
+extern int PFCconsumeInput(PgFdwConn *conn);
+extern int PFCisBusy(PgFdwConn *conn);
+extern ConnStatusType PFCstatus(const PgFdwConn *conn);
+extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn);
+extern int PFCserverVersion(const PgFdwConn *conn);
+extern char *PFCerrorMessage(const PgFdwConn *conn);
+extern int PFCconnectionUsedPassword(const PgFdwConn *conn);
+extern void PFCfinish(PgFdwConn *conn);
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+ const char *const * values, int expand_dbname);
+#endif /* PGFDWCONN_H */
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..2517f6b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
typedef struct ConnCacheEntry
{
ConnCacheKey key; /* hash key (must be first) */
- PGconn *conn; /* connection to foreign server, or NULL */
+ PgFdwConn *conn; /* connection to foreign server, or NULL */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
static bool xact_got_connection = false;
/* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg);
static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* be useful and not mere pedantry. We could not flush any active connections
* mid-transaction anyway.
*/
-PGconn *
+PgFdwConn *
GetConnection(ForeignServer *server, UserMapping *user,
bool will_prep_stmt)
{
@@ -161,9 +161,12 @@ GetConnection(ForeignServer *server, UserMapping *user,
entry->have_error = false;
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
- entry->conn, server->servername);
+ PFCgetPGconn(entry->conn), server->servername);
+
}
+ PFCincrementNscans(entry->conn);
+
/*
* Start a new transaction or subtransaction if needed.
*/
@@ -178,10 +181,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
/*
* Connect to remote server using specified server and user mapping properties.
*/
-static PGconn *
+static PgFdwConn *
connect_pg_server(ForeignServer *server, UserMapping *user)
{
- PGconn *volatile conn = NULL;
+ PgFdwConn *volatile conn = NULL;
/*
* Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +226,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
/* verify connection parameters and make connection */
check_conn_params(keywords, values);
- conn = PQconnectdbParams(keywords, values, false);
- if (!conn || PQstatus(conn) != CONNECTION_OK)
+ conn = PFCconnectdbParams(keywords, values, false);
+ if (!conn || PFCstatus(conn) != CONNECTION_OK)
{
char *connmessage;
int msglen;
/* libpq typically appends a newline, strip that */
- connmessage = pstrdup(PQerrorMessage(conn));
+ connmessage = pstrdup(PFCerrorMessage(conn));
msglen = strlen(connmessage);
if (msglen > 0 && connmessage[msglen - 1] == '\n')
connmessage[msglen - 1] = '\0';
@@ -246,7 +249,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
* otherwise, he's piggybacking on the postgres server's user
* identity. See also dblink_security_check() in contrib/dblink.
*/
- if (!superuser() && !PQconnectionUsedPassword(conn))
+ if (!superuser() && !PFCconnectionUsedPassword(conn))
ereport(ERROR,
(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
errmsg("password is required"),
@@ -263,7 +266,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
{
/* Release PGconn data structure if we managed to create one */
if (conn)
- PQfinish(conn);
+ PFCfinish(conn);
PG_RE_THROW();
}
PG_END_TRY();
@@ -312,9 +315,9 @@ check_conn_params(const char **keywords, const char **values)
* there are any number of ways to break things.
*/
static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
{
- int remoteversion = PQserverVersion(conn);
+ int remoteversion = PFCserverVersion(conn);
/* Force the search path to contain only pg_catalog (see deparse.c) */
do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +351,11 @@ configure_remote_session(PGconn *conn)
* Convenience subroutine to issue a non-data-returning SQL command to remote
*/
static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
{
PGresult *res;
- res = PQexec(conn, sql);
+ res = PFCexec(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -379,7 +382,7 @@ begin_remote_xact(ConnCacheEntry *entry)
const char *sql;
elog(DEBUG3, "starting remote transaction on connection %p",
- entry->conn);
+ PFCgetPGconn(entry->conn));
if (IsolationIsSerializable())
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +411,11 @@ begin_remote_xact(ConnCacheEntry *entry)
* Release connection reference count created by calling GetConnection.
*/
void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
{
- /*
- * Currently, we don't actually track connection references because all
- * cleanup is managed on a transaction or subtransaction basis instead. So
- * there's nothing to do here.
- */
+ /* ongoing async query should be canceled if no scans left */
+ if (PFCdecrementNscans(conn) == 0)
+ finish_async_query(conn);
}
/*
@@ -429,7 +430,7 @@ ReleaseConnection(PGconn *conn)
* collisions are highly improbable; just be sure to use %u not %d to print.
*/
unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
{
return ++cursor_number;
}
@@ -443,7 +444,7 @@ GetCursorNumber(PGconn *conn)
* increasing the risk of prepared-statement name collisions by resetting.
*/
unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
{
return ++prep_stmt_number;
}
@@ -462,7 +463,7 @@ GetPrepStmtNumber(PGconn *conn)
* marked with have_error = true.
*/
void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
bool clear, const char *sql)
{
/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +491,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
* return NULL, not a PGresult at all.
*/
if (message_primary == NULL)
- message_primary = PQerrorMessage(conn);
+ message_primary = PFCerrorMessage(conn);
ereport(elevel,
(errcode(sqlstate),
@@ -542,7 +543,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (entry->xact_depth > 0)
{
elog(DEBUG3, "closing remote transaction on connection %p",
- entry->conn);
+ PFCgetPGconn(entry->conn));
switch (event)
{
@@ -567,7 +568,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (entry->have_prep_stmt && entry->have_error)
{
- res = PQexec(entry->conn, "DEALLOCATE ALL");
+ res = PFCexec(entry->conn, "DEALLOCATE ALL");
PQclear(res);
}
entry->have_prep_stmt = false;
@@ -597,7 +598,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/* If we're aborting, abort all remote transactions too */
- res = PQexec(entry->conn, "ABORT TRANSACTION");
+ res = PFCexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -608,7 +609,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* As above, make sure to clear any prepared stmts */
if (entry->have_prep_stmt && entry->have_error)
{
- res = PQexec(entry->conn, "DEALLOCATE ALL");
+ res = PFCexec(entry->conn, "DEALLOCATE ALL");
PQclear(res);
}
entry->have_prep_stmt = false;
@@ -620,17 +621,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Reset state to show we're out of a transaction */
entry->xact_depth = 0;
+ PFCcancelAsync(entry->conn);
+ PFCinit(entry->conn);
/*
* If the connection isn't in a good idle state, discard it to
* recover. Next GetConnection will open a new connection.
*/
- if (PQstatus(entry->conn) != CONNECTION_OK ||
- PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+ if (PFCstatus(entry->conn) != CONNECTION_OK ||
+ PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
{
- elog(DEBUG3, "discarding connection %p", entry->conn);
- PQfinish(entry->conn);
- entry->conn = NULL;
+ elog(DEBUG3, "discarding connection %p",
+ PFCgetPGconn(entry->conn));
+ PFCfinish(entry->conn);
}
}
@@ -676,6 +679,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
PGresult *res;
char sql[100];
+ /* Shut down asynchronous scan if running */
+ PFCcancelAsync(entry->conn);
+
/*
* We only care about connections with open remote subtransactions of
* the current level.
@@ -701,7 +707,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
curlevel, curlevel);
- res = PQexec(entry->conn, sql);
+ res = PFCexec(entry->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, entry->conn, true, sql);
else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..1dfb221 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -123,6 +123,12 @@ enum FdwModifyPrivateIndex
FdwModifyPrivateRetrievedAttrs
};
+typedef enum fetch_mode {
+ START_ONLY,
+ FORCE_SYNC,
+ ALLOW_ASYNC
+} fetch_mode;
+
/*
* Execution state of a foreign scan using postgres_fdw.
*/
@@ -136,7 +142,7 @@ typedef struct PgFdwScanState
List *retrieved_attrs; /* list of retrieved attribute numbers */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
+ PgFdwConn *conn; /* connection for the scan */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@@ -156,6 +162,7 @@ typedef struct PgFdwScanState
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
+ ExprContext *econtext; /* copy of ps_ExprContext of ForeignScanState */
} PgFdwScanState;
/*
@@ -167,7 +174,7 @@ typedef struct PgFdwModifyState
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
+ PgFdwConn *conn; /* connection for the scan */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@@ -298,7 +305,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
double *p_rows, int *p_width,
Cost *p_startup_cost, Cost *p_total_cost);
static void get_remote_estimate(const char *sql,
- PGconn *conn,
+ PgFdwConn *conn,
double *rows,
int *width,
Cost *startup_cost,
@@ -306,9 +313,9 @@ static void get_remote_estimate(const char *sql,
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
+static void fetch_more_data(PgFdwScanState *node, fetch_mode cmd);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
@@ -329,7 +336,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
MemoryContext temp_context);
static void conversion_error_callback(void *arg);
-
/*
* Foreign-data wrapper handler function: return a struct with pointers
* to my callback routines.
@@ -982,6 +988,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
else
fsstate->param_values = NULL;
+
+ fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+ /*
+ * Start scanning asynchronously if it is the first scan on this
+ * connection.
+ */
+ if (PFCgetNscans(fsstate->conn) == 1)
+ create_cursor(fsstate);
}
/*
@@ -1000,7 +1015,10 @@ postgresIterateForeignScan(ForeignScanState *node)
* cursor on the remote side.
*/
if (!fsstate->cursor_exists)
- create_cursor(node);
+ {
+ finish_async_query(fsstate->conn);
+ create_cursor(fsstate);
+ }
/*
* Get some more tuples, if we've run out.
@@ -1009,7 +1027,7 @@ postgresIterateForeignScan(ForeignScanState *node)
{
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
- fetch_more_data(node);
+ fetch_more_data(fsstate, ALLOW_ASYNC);
/* If we didn't get any tuples, must be end of data. */
if (fsstate->next_tuple >= fsstate->num_tuples)
return ExecClearTuple(slot);
@@ -1069,7 +1087,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fsstate->conn, sql);
+ res = PFCexec(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@@ -1392,19 +1410,22 @@ postgresExecForeignInsert(EState *estate,
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+ /* Finish async query if runing */
+ finish_async_query(fmstate->conn);
+
/*
* Execute the prepared statement, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = PFCexecPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1462,19 +1483,22 @@ postgresExecForeignUpdate(EState *estate,
(ItemPointer) DatumGetPointer(datum),
slot);
+ /* Finish async query if runing */
+ finish_async_query(fmstate->conn);
+
/*
* Execute the prepared statement, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = PFCexecPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1532,19 +1556,22 @@ postgresExecForeignDelete(EState *estate,
(ItemPointer) DatumGetPointer(datum),
NULL);
+ /* Finish async query if runing */
+ finish_async_query(fmstate->conn);
+
/*
* Execute the prepared statement, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = PFCexecPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1594,7 +1621,7 @@ postgresEndForeignModify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fmstate->conn, sql);
+ res = PFCexec(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@@ -1726,7 +1753,7 @@ estimate_path_cost_size(PlannerInfo *root,
List *local_join_conds;
StringInfoData sql;
List *retrieved_attrs;
- PGconn *conn;
+ PgFdwConn *conn;
Selectivity local_sel;
QualCost local_cost;
@@ -1836,7 +1863,7 @@ estimate_path_cost_size(PlannerInfo *root,
* The given "sql" must be an EXPLAIN command.
*/
static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
double *rows, int *width,
Cost *startup_cost, Cost *total_cost)
{
@@ -1852,7 +1879,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
- res = PQexec(conn, sql);
+ res = PFCexec(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@@ -1917,13 +1944,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
* Create cursor for node's query with current parameter values.
*/
static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
- ExprContext *econtext = node->ss.ps.ps_ExprContext;
+ ExprContext *econtext = fsstate->econtext;
int numParams = fsstate->numParams;
const char **values = fsstate->param_values;
- PGconn *conn = fsstate->conn;
+ PgFdwConn *conn = fsstate->conn;
StringInfoData buf;
PGresult *res;
@@ -1985,8 +2011,8 @@ create_cursor(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecParams(conn, buf.data, numParams, NULL, values,
- NULL, NULL, 0);
+ res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+ NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
@@ -2001,55 +2027,128 @@ create_cursor(ForeignScanState *node)
/* Clean up */
pfree(buf.data);
+
+ /*
+ * Start async scan if this is the first scan. See fetch_more_data() for
+ * details
+ */
+ if (PFCgetNscans(conn) == 1)
+ fetch_more_data(fsstate, START_ONLY);
}
/*
* Fetch some more rows from the node's cursor.
*/
static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
PGresult *volatile res = NULL;
MemoryContext oldcontext;
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
- * batch.
+ * batch. Some tuples left unread when asynchronous fetching is
+ * interrupted. Don't flush to preserve the unread tuples for the case. It
+ * occurs no more than twice successively.
*/
- fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ fsstate->tuples = NULL;
+ MemoryContextReset(fsstate->batch_cxt);
+ }
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
/* PGresult must be released before leaving this function. */
PG_TRY();
{
- PGconn *conn = fsstate->conn;
+ PgFdwConn *conn = fsstate->conn;
char sql[64];
int fetch_size;
- int numrows;
+ int numrows, addrows, restrows;
+ HeapTuple *tmptuples;
int i;
/* The fetch size is arbitrary, but shouldn't be enormous. */
- fetch_size = 100;
+ fetch_size = 10000;
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fetch_size, fsstate->cursor_number);
- res = PQexec(conn, sql);
+ if (PFCisAsyncRunning(conn))
+ {
+ Assert (cmd != START_ONLY);
+
+ /*
+ * If the target fsstate is different from the scan state that the
+ * current async fetch running for, the result should be stored
+ * into it, then synchronously fetch data for the target fsstate.
+ */
+ if (fsstate != PFCgetAsyncScan(conn))
+ {
+ fetch_more_data(PFCgetAsyncScan(conn), FORCE_SYNC);
+ res = PFCexec(conn, sql);
+ }
+ else
+ {
+ /* Get result of running async fetch */
+ res = PFCgetResult(conn);
+ if (PQntuples(res) == fetch_size)
+ {
+ /*
+ * Connection state doesn't go to IDLE even if all data
+ * has been sent to client for asynchronous query. One
+ * more PQgetResult() is needed to reset the state to
+ * IDLE. See PQexecFinish() for details.
+ */
+ if (PFCgetResult(conn) != NULL)
+ elog(ERROR, "Connection status error.");
+ }
+ }
+ PFCsetAsyncScan(conn, NULL);
+ }
+ else
+ {
+ if (cmd == START_ONLY)
+ {
+ Assert(PFCgetNscans(conn) == 1);
+
+ if (!PFCsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, res, conn, false,
+ fsstate->query);
+
+ PFCsetAsyncScan(conn, fsstate);
+ goto end_of_fetch;
+ }
+
+ /* Elsewise do synchronous query execution */
+ PFCsetAsyncScan(conn, NULL);
+ res = PFCexec(conn, sql);
+ }
+
/* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ if (res && PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
- /* Convert the data into HeapTuples */
- numrows = PQntuples(res);
+ /* allocate tuple storage */
+ tmptuples = fsstate->tuples;
+ addrows = PQntuples(res);
+ restrows = fsstate->num_tuples - fsstate->next_tuple;
+ numrows = restrows + addrows;
fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+
+ Assert(restrows == 0 || tmptuples);
+
+ /* copy unread tuples if any */
+ for (i = 0 ; i < restrows ; i++)
+ fsstate->tuples[i] = tmptuples[fsstate->next_tuple + i];
+
fsstate->num_tuples = numrows;
fsstate->next_tuple = 0;
- for (i = 0; i < numrows; i++)
+ /* Convert the data into HeapTuples */
+ for (i = 0 ; i < addrows; i++)
{
- fsstate->tuples[i] =
+ fsstate->tuples[restrows + i] =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
@@ -2066,6 +2165,23 @@ fetch_more_data(ForeignScanState *node)
PQclear(res);
res = NULL;
+
+ if (cmd == ALLOW_ASYNC)
+ {
+ if (!fsstate->eof_reached)
+ {
+ /*
+ * We can immediately request the next bunch of tuples if
+ * we're on asynchronous connection.
+ */
+ if (!PFCsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ PFCsetAsyncScan(conn, fsstate);
+ }
+ }
+
+end_of_fetch:
+ ; /* Nothing to do here but needed to make compiler quiet. */
}
PG_CATCH();
{
@@ -2079,6 +2195,28 @@ fetch_more_data(ForeignScanState *node)
}
/*
+ * Force cancelling async command state.
+ */
+void
+finish_async_query(PgFdwConn *conn)
+{
+ PgFdwScanState *fsstate = PFCgetAsyncScan(conn);
+ PgFdwConn *async_conn;
+
+ /* Nothing to do if no async connection */
+ if (fsstate == NULL) return;
+ async_conn = fsstate->conn;
+ if (!async_conn ||
+ PFCgetNscans(async_conn) == 1 ||
+ !PFCisAsyncRunning(async_conn))
+ return;
+
+ fetch_more_data(PFCgetAsyncScan(async_conn), FORCE_SYNC);
+
+ Assert(!PFCisAsyncRunning(async_conn));
+}
+
+/*
* Force assorted GUC parameters to settings that ensure that we'll output
* data values in a form that is unambiguous to the remote server.
*
@@ -2132,7 +2270,7 @@ reset_transmission_modes(int nestlevel)
* Utility routine to close a cursor.
*/
static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
{
char sql[64];
PGresult *res;
@@ -2143,7 +2281,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(conn, sql);
+ res = PFCexec(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -2165,6 +2303,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
GetPrepStmtNumber(fmstate->conn));
p_name = pstrdup(prep_name);
+ /* Finish async query if runing */
+ finish_async_query(fmstate->conn);
+
/*
* We intentionally do not specify parameter types here, but leave the
* remote server to derive them by default. This avoids possible problems
@@ -2175,11 +2316,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQprepare(fmstate->conn,
- p_name,
- fmstate->query,
- 0,
- NULL);
+ res = PFCprepare(fmstate->conn,
+ p_name,
+ fmstate->query,
+ 0,
+ NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2297,7 +2438,7 @@ postgresAnalyzeForeignTable(Relation relation,
ForeignTable *table;
ForeignServer *server;
UserMapping *user;
- PGconn *conn;
+ PgFdwConn *conn;
StringInfoData sql;
PGresult *volatile res = NULL;
@@ -2329,7 +2470,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = PFCexec(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2379,7 +2520,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
ForeignTable *table;
ForeignServer *server;
UserMapping *user;
- PGconn *conn;
+ PgFdwConn *conn;
unsigned int cursor_number;
StringInfoData sql;
PGresult *volatile res = NULL;
@@ -2423,7 +2564,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = PFCexec(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@@ -2453,7 +2594,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
fetch_size, cursor_number);
- res = PQexec(conn, fetch_sql);
+ res = PFCexec(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2723,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
bool import_not_null = true;
ForeignServer *server;
UserMapping *mapping;
- PGconn *conn;
+ PgFdwConn *conn;
StringInfoData buf;
PGresult *volatile res = NULL;
int numrows,
@@ -2615,7 +2756,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
conn = GetConnection(server, mapping, false);
/* Don't attempt to import collation if remote server hasn't got it */
- if (PQserverVersion(conn) < 90100)
+ if (PFCserverVersion(conn) < 90100)
import_collate = false;
/* Create workspace for strings */
@@ -2628,7 +2769,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = PQexec(conn, buf.data);
+ res = PFCexec(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -2723,7 +2864,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = PQexec(conn, buf.data);
+ res = PFCexec(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..b117a88 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -18,19 +18,22 @@
#include "nodes/relation.h"
#include "utils/relcache.h"
-#include "libpq-fe.h"
+#include "PgFdwConn.h"
+
+struct PgFdwScanState;
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
+extern void finish_async_query(PgFdwConn *fsstate);
/* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
bool clear, const char *sql);
/* in option.c */
--
2.1.0.GIT
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers