Hello, that's a silly mistake. fetch_seize = 10000 in the v4 patch. This v5 patch is fixed at the point.
> But the v4 patch mysteriously accelerates this query, 6.5 seconds. > > > =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c > > FROM ft1 AS x JOIN ft1 AS y on x.a = y.a; ... > > Execution time: 6512.043 ms fetch_size was 10000 at this run. I got about 13.0 seconds for fetch_size = 100, which is about 19% faster than the original. regards, -- Kyotaro Horiguchi NTT Open Source Software Center ======= 15 17:18:49 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyot...@lab.ntt.co.jp> wrote in <20150116.171849.109146500.horiguchi.kyot...@lab.ntt.co.jp> > I revised the patch so that async scan will be done more > aggressively, and took execution time for two very simple cases. > > As the result, simple seq scan gained 5% and hash join of two > foreign tables gained 150%. (2.4 times faster). > > While measuring the performance, I noticed that each scan in a > query runs at once rather than alternating with each other in > many cases such as hash join or sorted joins and so. So I > modified the patch so that async fetch is done more > aggressively. The new v4 patch is attached. The following numbers > are taken based on it. > > ======== > Simple seq scan for the first test. > > > CREATE TABLE lt1 (a int, b timestamp, c text); > > CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host > > 'localhost'); > > CREATE USER MAPPING FOR PUBLIC SERVER sv1; > > CREATE FOREIGN TABLE ft1 () SERVER sv1 OPTIONS (table_name 'lt1'); > > INSERT INTO lt1 (SELECT a, now(), repeat('x', 128) FROM generate_series(0, > > 999999) a); > > On this case, I took the the 10 times average of exec time of the > following query for both master head and patched version. The > fetch size is 100. > > > postgres=# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT * FROM ft1; > > QUERY PLAN > > ------------------------------------------------------------------ > > Foreign Scan on ft1 (actual time=0.79 5..4175.706 rows=1000000 loops=1) > > Planning time: 0.060 ms > > Execution time: 4276.043 ms > > master head : avg = 4256.621, std dev = 17.099 > patched pgfdw: avg = 4036.463, std dev = 2.608 > > The patched version is faster by about 5%. This should be pure > result of asynchronous fetching, not including the effect of > early starting of remote execution in ExecInit. > > Interestingly, as fetch_count gets larger, the gain raises in > spite of the decrease of the number of query sending. > > master head : avg = 2622.759, std dev = 38.379 > patched pgfdw: avg = 2277.622, std dev = 27.269 > > About 15% gain. And for 10000, > > master head : avg = 2000.980, std dev = 6.434 > patched pgfdw: avg = 1616.793, std dev = 13.192 > > 19%.. It is natural that exec time reduces along with increase of > fetch size, but I haven't found the reason why the patch's gain > also increases. > > ====================== > > The second case is a simple join of two foreign tables sharing > one connection. > > The master head runs this query in about 16 seconds with almost > no fluctuation among multiple tries. > > > =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c > > FROM ft1 AS x JOIN ft1 AS y on x.a = y.a; > > QUERY PLAN > > ---------------------------------------------------------------------------- > > Hash Join (actual time=7541.831..15924.631 rows=1000000 loops=1) > > Hash Cond: (x.a = y.a) > > -> Foreign Scan on ft1 x (actual time=1.176..6553.480 rows=1000000 > > loops=1) > > -> Hash (actual time=7539.761..7539.761 rows=1000000 loops=1) > > Buckets: 32768 Batches: 64 Memory Usage: 2829kB > > -> Foreign Scan on ft1 y (actual time=1.067..6529.165 rows=1000000 > > loops=1) > > Planning time: 0.223 ms > > Execution time: 15973.916 ms > > But the v4 patch mysteriously accelerates this query, 6.5 seconds. > > > =# EXPLAIN (ANALYZE ON, COSTS OFF) SELECT x.a, x.c, y.c > > FROM ft1 AS x JOIN ft1 AS y on x.a = y.a; > > QUERY PLAN > > ---------------------------------------------------------------------------- > > Hash Join (actual time=2556.977..5812.937 rows=1000000 loops=1) > > Hash Cond: (x.a = y.a) > > -> Foreign Scan on ft1 x (actual time=32.689..1936.565 rows=1000000 > > loops=1) > > -> Hash (actual time=2523.810..2523.810 rows=1000000 loops=1) > > Buckets: 32768 Batches: 64 Memory Usage: 2829kB > > -> Foreign Scan on ft1 y (actual time=50.345..1928.411 rows=1000000 > > loops=1) > > Planning time: 0.220 ms > > Execution time: 6512.043 ms > > The result data seems not broken. I don't know the reason yet but > I'll investigate it.
>From faea77944d4d3e3332d9723958f548356e3bceba Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Tue, 13 Jan 2015 19:20:35 +0900 Subject: [PATCH] Asynchronous execution of postgres_fdw v5 This is the modified version of Asynchronous execution of postgres_fdw. - Do async fetch more aggressively than v3. - No additional tests yet :( --- contrib/postgres_fdw/Makefile | 2 +- contrib/postgres_fdw/PgFdwConn.c | 200 +++++++++++++++++++++++++ contrib/postgres_fdw/PgFdwConn.h | 61 ++++++++ contrib/postgres_fdw/connection.c | 82 ++++++----- contrib/postgres_fdw/postgres_fdw.c | 281 +++++++++++++++++++++++++++--------- contrib/postgres_fdw/postgres_fdw.h | 15 +- 6 files changed, 526 insertions(+), 115 deletions(-) create mode 100644 contrib/postgres_fdw/PgFdwConn.c create mode 100644 contrib/postgres_fdw/PgFdwConn.h diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index d2b98e1..d0913e2 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -1,7 +1,7 @@ # contrib/postgres_fdw/Makefile MODULE_big = postgres_fdw -OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES) +OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES) PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL" PG_CPPFLAGS = -I$(libpq_srcdir) diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c new file mode 100644 index 0000000..b13b597 --- /dev/null +++ b/contrib/postgres_fdw/PgFdwConn.c @@ -0,0 +1,200 @@ +/*------------------------------------------------------------------------- + * + * PgFdwConn.c + * PGconn extending wrapper to enable asynchronous query. + * + * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgres_fdw/PgFdwConn.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "PgFdwConn.h" + +#define PFC_ALLOCATE() ((PgFdwConn *)malloc(sizeof(PgFdwConn))) +#define PFC_FREE(c) free(c) + +struct pgfdw_conn +{ + PGconn *pgconn; /* libpq connection for this connection */ + int nscans; /* number of scans using this connection */ + struct PgFdwScanState *async_scan; /* the connection currently running + * async query on this connection */ +}; + +void +PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan) +{ + conn->async_scan = scan; +} + +struct PgFdwScanState * +PFCgetAsyncScan(PgFdwConn *conn) +{ + return conn->async_scan; +} + +int +PFCisAsyncRunning(PgFdwConn *conn) +{ + return conn->async_scan != NULL; +} + +PGconn * +PFCgetPGconn(PgFdwConn *conn) +{ + return conn->pgconn; +} + +int +PFCgetNscans(PgFdwConn *conn) +{ + return conn->nscans; +} + +int +PFCincrementNscans(PgFdwConn *conn) +{ + return ++conn->nscans; +} + +int +PFCdecrementNscans(PgFdwConn *conn) +{ + Assert(conn->nscans > 0); + return --conn->nscans; +} + +void +PFCcancelAsync(PgFdwConn *conn) +{ + if (PFCisAsyncRunning(conn)) + PFCconsumeInput(conn); +} + +void +PFCinit(PgFdwConn *conn) +{ + conn->async_scan = NULL; + conn->nscans = 0; +} + +int +PFCsendQuery(PgFdwConn *conn, const char *query) +{ + return PQsendQuery(conn->pgconn, query); +} + +PGresult * +PFCexec(PgFdwConn *conn, const char *query) +{ + return PQexec(conn->pgconn, query); +} + +PGresult * +PFCexecParams(PgFdwConn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) +{ + return PQexecParams(conn->pgconn, + command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat); +} + +PGresult * +PFCprepare(PgFdwConn *conn, + const char *stmtName, const char *query, + int nParams, const Oid *paramTypes) +{ + return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes); +} + +PGresult * +PFCexecPrepared(PgFdwConn *conn, + const char *stmtName, + int nParams, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) +{ + return PQexecPrepared(conn->pgconn, + stmtName, nParams, paramValues, paramLengths, + paramFormats, resultFormat); +} + +PGresult * +PFCgetResult(PgFdwConn *conn) +{ + return PQgetResult(conn->pgconn); +} + +int +PFCconsumeInput(PgFdwConn *conn) +{ + return PQconsumeInput(conn->pgconn); +} + +int +PFCisBusy(PgFdwConn *conn) +{ + return PQisBusy(conn->pgconn); +} + +ConnStatusType +PFCstatus(const PgFdwConn *conn) +{ + return PQstatus(conn->pgconn); +} + +PGTransactionStatusType +PFCtransactionStatus(const PgFdwConn *conn) +{ + return PQtransactionStatus(conn->pgconn); +} + +int +PFCserverVersion(const PgFdwConn *conn) +{ + return PQserverVersion(conn->pgconn); +} + +char * +PFCerrorMessage(const PgFdwConn *conn) +{ + return PQerrorMessage(conn->pgconn); +} + +int +PFCconnectionUsedPassword(const PgFdwConn *conn) +{ + return PQconnectionUsedPassword(conn->pgconn); +} + +void +PFCfinish(PgFdwConn *conn) +{ + return PQfinish(conn->pgconn); + PFC_FREE(conn); +} + +PgFdwConn * +PFCconnectdbParams(const char *const * keywords, + const char *const * values, int expand_dbname) +{ + PgFdwConn *ret = PFC_ALLOCATE(); + + PFCinit(ret); + ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname); + + return ret; +} diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h new file mode 100644 index 0000000..f695f5a --- /dev/null +++ b/contrib/postgres_fdw/PgFdwConn.h @@ -0,0 +1,61 @@ +/*------------------------------------------------------------------------- + * + * PgFdwConn.h + * PGconn extending wrapper to enable asynchronous query. + * + * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgres_fdw/PgFdwConn.h + * + *------------------------------------------------------------------------- + */ +#ifndef PGFDWCONN_H +#define PGFDWCONN_H + +#include "libpq-fe.h" + +typedef struct pgfdw_conn PgFdwConn; +struct PgFdwScanState; + +extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan); +extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn); +extern int PFCisAsyncRunning(PgFdwConn *conn); +extern PGconn *PFCgetPGconn(PgFdwConn *conn); +extern int PFCgetNscans(PgFdwConn *conn); +extern int PFCincrementNscans(PgFdwConn *conn); +extern int PFCdecrementNscans(PgFdwConn *conn); +extern void PFCcancelAsync(PgFdwConn *conn); +extern void PFCinit(PgFdwConn *conn); +extern int PFCsendQuery(PgFdwConn *conn, const char *query); +extern PGresult *PFCexec(PgFdwConn *conn, const char *query); +extern PGresult *PFCexecParams(PgFdwConn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat); +extern PGresult *PFCprepare(PgFdwConn *conn, + const char *stmtName, const char *query, + int nParams, const Oid *paramTypes); +extern PGresult *PFCexecPrepared(PgFdwConn *conn, + const char *stmtName, + int nParams, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat); +extern PGresult *PFCgetResult(PgFdwConn *conn); +extern int PFCconsumeInput(PgFdwConn *conn); +extern int PFCisBusy(PgFdwConn *conn); +extern ConnStatusType PFCstatus(const PgFdwConn *conn); +extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn); +extern int PFCserverVersion(const PgFdwConn *conn); +extern char *PFCerrorMessage(const PgFdwConn *conn); +extern int PFCconnectionUsedPassword(const PgFdwConn *conn); +extern void PFCfinish(PgFdwConn *conn); +extern PgFdwConn *PFCconnectdbParams(const char *const * keywords, + const char *const * values, int expand_dbname); +#endif /* PGFDWCONN_H */ diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4e02cb2..2517f6b 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -44,7 +44,7 @@ typedef struct ConnCacheKey typedef struct ConnCacheEntry { ConnCacheKey key; /* hash key (must be first) */ - PGconn *conn; /* connection to foreign server, or NULL */ + PgFdwConn *conn; /* connection to foreign server, or NULL */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ @@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ -static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user); static void check_conn_params(const char **keywords, const char **values); -static void configure_remote_session(PGconn *conn); -static void do_sql_command(PGconn *conn, const char *sql); +static void configure_remote_session(PgFdwConn *conn); +static void do_sql_command(PgFdwConn *conn, const char *sql); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, * be useful and not mere pedantry. We could not flush any active connections * mid-transaction anyway. */ -PGconn * +PgFdwConn * GetConnection(ForeignServer *server, UserMapping *user, bool will_prep_stmt) { @@ -161,9 +161,12 @@ GetConnection(ForeignServer *server, UserMapping *user, entry->have_error = false; entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", - entry->conn, server->servername); + PFCgetPGconn(entry->conn), server->servername); + } + PFCincrementNscans(entry->conn); + /* * Start a new transaction or subtransaction if needed. */ @@ -178,10 +181,10 @@ GetConnection(ForeignServer *server, UserMapping *user, /* * Connect to remote server using specified server and user mapping properties. */ -static PGconn * +static PgFdwConn * connect_pg_server(ForeignServer *server, UserMapping *user) { - PGconn *volatile conn = NULL; + PgFdwConn *volatile conn = NULL; /* * Use PG_TRY block to ensure closing connection on error. @@ -223,14 +226,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user) /* verify connection parameters and make connection */ check_conn_params(keywords, values); - conn = PQconnectdbParams(keywords, values, false); - if (!conn || PQstatus(conn) != CONNECTION_OK) + conn = PFCconnectdbParams(keywords, values, false); + if (!conn || PFCstatus(conn) != CONNECTION_OK) { char *connmessage; int msglen; /* libpq typically appends a newline, strip that */ - connmessage = pstrdup(PQerrorMessage(conn)); + connmessage = pstrdup(PFCerrorMessage(conn)); msglen = strlen(connmessage); if (msglen > 0 && connmessage[msglen - 1] == '\n') connmessage[msglen - 1] = '\0'; @@ -246,7 +249,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) * otherwise, he's piggybacking on the postgres server's user * identity. See also dblink_security_check() in contrib/dblink. */ - if (!superuser() && !PQconnectionUsedPassword(conn)) + if (!superuser() && !PFCconnectionUsedPassword(conn)) ereport(ERROR, (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), errmsg("password is required"), @@ -263,7 +266,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) { /* Release PGconn data structure if we managed to create one */ if (conn) - PQfinish(conn); + PFCfinish(conn); PG_RE_THROW(); } PG_END_TRY(); @@ -312,9 +315,9 @@ check_conn_params(const char **keywords, const char **values) * there are any number of ways to break things. */ static void -configure_remote_session(PGconn *conn) +configure_remote_session(PgFdwConn *conn) { - int remoteversion = PQserverVersion(conn); + int remoteversion = PFCserverVersion(conn); /* Force the search path to contain only pg_catalog (see deparse.c) */ do_sql_command(conn, "SET search_path = pg_catalog"); @@ -348,11 +351,11 @@ configure_remote_session(PGconn *conn) * Convenience subroutine to issue a non-data-returning SQL command to remote */ static void -do_sql_command(PGconn *conn, const char *sql) +do_sql_command(PgFdwConn *conn, const char *sql) { PGresult *res; - res = PQexec(conn, sql); + res = PFCexec(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -379,7 +382,7 @@ begin_remote_xact(ConnCacheEntry *entry) const char *sql; elog(DEBUG3, "starting remote transaction on connection %p", - entry->conn); + PFCgetPGconn(entry->conn)); if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; @@ -408,13 +411,11 @@ begin_remote_xact(ConnCacheEntry *entry) * Release connection reference count created by calling GetConnection. */ void -ReleaseConnection(PGconn *conn) +ReleaseConnection(PgFdwConn *conn) { - /* - * Currently, we don't actually track connection references because all - * cleanup is managed on a transaction or subtransaction basis instead. So - * there's nothing to do here. - */ + /* ongoing async query should be canceled if no scans left */ + if (PFCdecrementNscans(conn) == 0) + finish_async_query(conn); } /* @@ -429,7 +430,7 @@ ReleaseConnection(PGconn *conn) * collisions are highly improbable; just be sure to use %u not %d to print. */ unsigned int -GetCursorNumber(PGconn *conn) +GetCursorNumber(PgFdwConn *conn) { return ++cursor_number; } @@ -443,7 +444,7 @@ GetCursorNumber(PGconn *conn) * increasing the risk of prepared-statement name collisions by resetting. */ unsigned int -GetPrepStmtNumber(PGconn *conn) +GetPrepStmtNumber(PgFdwConn *conn) { return ++prep_stmt_number; } @@ -462,7 +463,7 @@ GetPrepStmtNumber(PGconn *conn) * marked with have_error = true. */ void -pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, +pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, bool clear, const char *sql) { /* If requested, PGresult must be released before leaving this function. */ @@ -490,7 +491,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, * return NULL, not a PGresult at all. */ if (message_primary == NULL) - message_primary = PQerrorMessage(conn); + message_primary = PFCerrorMessage(conn); ereport(elevel, (errcode(sqlstate), @@ -542,7 +543,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (entry->xact_depth > 0) { elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + PFCgetPGconn(entry->conn)); switch (event) { @@ -567,7 +568,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = PFCexec(entry->conn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -597,7 +598,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Assume we might have lost track of prepared statements */ entry->have_error = true; /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn, "ABORT TRANSACTION"); + res = PFCexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, @@ -608,7 +609,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* As above, make sure to clear any prepared stmts */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = PFCexec(entry->conn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -620,17 +621,19 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; + PFCcancelAsync(entry->conn); + PFCinit(entry->conn); /* * If the connection isn't in a good idle state, discard it to * recover. Next GetConnection will open a new connection. */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + if (PFCstatus(entry->conn) != CONNECTION_OK || + PFCtransactionStatus(entry->conn) != PQTRANS_IDLE) { - elog(DEBUG3, "discarding connection %p", entry->conn); - PQfinish(entry->conn); - entry->conn = NULL; + elog(DEBUG3, "discarding connection %p", + PFCgetPGconn(entry->conn)); + PFCfinish(entry->conn); } } @@ -676,6 +679,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, PGresult *res; char sql[100]; + /* Shut down asynchronous scan if running */ + PFCcancelAsync(entry->conn); + /* * We only care about connections with open remote subtransactions of * the current level. @@ -701,7 +707,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, snprintf(sql, sizeof(sql), "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", curlevel, curlevel); - res = PQexec(entry->conn, sql); + res = PFCexec(entry->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, sql); else diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index d76e739..f7b0207 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -123,6 +123,12 @@ enum FdwModifyPrivateIndex FdwModifyPrivateRetrievedAttrs }; +typedef enum fetch_mode { + START_ONLY, + FORCE_SYNC, + ALLOW_ASYNC +} fetch_mode; + /* * Execution state of a foreign scan using postgres_fdw. */ @@ -136,7 +142,7 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + PgFdwConn *conn; /* connection for the scan */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -156,6 +162,7 @@ typedef struct PgFdwScanState /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ + ExprContext *econtext; /* copy of ps_ExprContext of ForeignScanState */ } PgFdwScanState; /* @@ -167,7 +174,7 @@ typedef struct PgFdwModifyState AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + PgFdwConn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -298,7 +305,7 @@ static void estimate_path_cost_size(PlannerInfo *root, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost); static void get_remote_estimate(const char *sql, - PGconn *conn, + PgFdwConn *conn, double *rows, int *width, Cost *startup_cost, @@ -306,9 +313,9 @@ static void get_remote_estimate(const char *sql, static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); -static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void create_cursor(PgFdwScanState *node); +static void close_cursor(PgFdwConn *conn, unsigned int cursor_number); +static void fetch_more_data(PgFdwScanState *node, fetch_mode cmd); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, @@ -329,7 +336,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, MemoryContext temp_context); static void conversion_error_callback(void *arg); - /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. @@ -982,6 +988,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); else fsstate->param_values = NULL; + + fsstate->econtext = node->ss.ps.ps_ExprContext; + + /* + * Start scanning asynchronously if it is the first scan on this + * connection. + */ + if (PFCgetNscans(fsstate->conn) == 1) + create_cursor(fsstate); } /* @@ -1000,7 +1015,10 @@ postgresIterateForeignScan(ForeignScanState *node) * cursor on the remote side. */ if (!fsstate->cursor_exists) - create_cursor(node); + { + finish_async_query(fsstate->conn); + create_cursor(fsstate); + } /* * Get some more tuples, if we've run out. @@ -1009,7 +1027,7 @@ postgresIterateForeignScan(ForeignScanState *node) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) - fetch_more_data(node); + fetch_more_data(fsstate, ALLOW_ASYNC); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); @@ -1069,7 +1087,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fsstate->conn, sql); + res = PFCexec(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1392,19 +1410,22 @@ postgresExecForeignInsert(EState *estate, /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, NULL, slot); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = PFCexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1462,19 +1483,22 @@ postgresExecForeignUpdate(EState *estate, (ItemPointer) DatumGetPointer(datum), slot); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = PFCexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1532,19 +1556,22 @@ postgresExecForeignDelete(EState *estate, (ItemPointer) DatumGetPointer(datum), NULL); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = PFCexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1594,7 +1621,7 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fmstate->conn, sql); + res = PFCexec(fmstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -1726,7 +1753,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_join_conds; StringInfoData sql; List *retrieved_attrs; - PGconn *conn; + PgFdwConn *conn; Selectivity local_sel; QualCost local_cost; @@ -1836,7 +1863,7 @@ estimate_path_cost_size(PlannerInfo *root, * The given "sql" must be an EXPLAIN command. */ static void -get_remote_estimate(const char *sql, PGconn *conn, +get_remote_estimate(const char *sql, PgFdwConn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { @@ -1852,7 +1879,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = PQexec(conn, sql); + res = PFCexec(conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -1917,13 +1944,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, * Create cursor for node's query with current parameter values. */ static void -create_cursor(ForeignScanState *node) +create_cursor(PgFdwScanState *fsstate) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - ExprContext *econtext = node->ss.ps.ps_ExprContext; + ExprContext *econtext = fsstate->econtext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PgFdwConn *conn = fsstate->conn; StringInfoData buf; PGresult *res; @@ -1985,8 +2011,8 @@ create_cursor(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecParams(conn, buf.data, numParams, NULL, values, - NULL, NULL, 0); + res = PFCexecParams(conn, buf.data, numParams, NULL, values, + NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); @@ -2001,33 +2027,45 @@ create_cursor(ForeignScanState *node) /* Clean up */ pfree(buf.data); + + /* + * Start async scan if this is the first scan. See fetch_more_data() for + * details + */ + if (PFCgetNscans(conn) == 1) + fetch_more_data(fsstate, START_ONLY); } /* * Fetch some more rows from the node's cursor. */ static void -fetch_more_data(ForeignScanState *node) +fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; /* * We'll store the tuples in the batch_cxt. First, flush the previous - * batch. + * batch. Some tuples left unread when asynchronous fetching is + * interrupted. Don't flush to preserve the unread tuples for the case. It + * occurs no more than twice successively. */ - fsstate->tuples = NULL; - MemoryContextReset(fsstate->batch_cxt); + if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->tuples = NULL; + MemoryContextReset(fsstate->batch_cxt); + } oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PgFdwConn *conn = fsstate->conn; char sql[64]; int fetch_size; - int numrows; + int numrows, addrows, restrows; + HeapTuple *tmptuples; int i; /* The fetch size is arbitrary, but shouldn't be enormous. */ @@ -2036,20 +2074,81 @@ fetch_more_data(ForeignScanState *node) snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fetch_size, fsstate->cursor_number); - res = PQexec(conn, sql); + if (PFCisAsyncRunning(conn)) + { + Assert (cmd != START_ONLY); + + /* + * If the target fsstate is different from the scan state that the + * current async fetch running for, the result should be stored + * into it, then synchronously fetch data for the target fsstate. + */ + if (fsstate != PFCgetAsyncScan(conn)) + { + fetch_more_data(PFCgetAsyncScan(conn), FORCE_SYNC); + res = PFCexec(conn, sql); + } + else + { + /* Get result of running async fetch */ + res = PFCgetResult(conn); + if (PQntuples(res) == fetch_size) + { + /* + * Connection state doesn't go to IDLE even if all data + * has been sent to client for asynchronous query. One + * more PQgetResult() is needed to reset the state to + * IDLE. See PQexecFinish() for details. + */ + if (PFCgetResult(conn) != NULL) + elog(ERROR, "Connection status error."); + } + } + PFCsetAsyncScan(conn, NULL); + } + else + { + if (cmd == START_ONLY) + { + Assert(PFCgetNscans(conn) == 1); + + if (!PFCsendQuery(conn, sql)) + pgfdw_report_error(ERROR, res, conn, false, + fsstate->query); + + PFCsetAsyncScan(conn, fsstate); + goto end_of_fetch; + } + + /* Elsewise do synchronous query execution */ + PFCsetAsyncScan(conn, NULL); + res = PFCexec(conn, sql); + } + /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) + if (res && PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); - /* Convert the data into HeapTuples */ - numrows = PQntuples(res); + /* allocate tuple storage */ + tmptuples = fsstate->tuples; + addrows = PQntuples(res); + restrows = fsstate->num_tuples - fsstate->next_tuple; + numrows = restrows + addrows; fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + + Assert(restrows == 0 || tmptuples); + + /* copy unread tuples if any */ + for (i = 0 ; i < restrows ; i++) + fsstate->tuples[i] = tmptuples[fsstate->next_tuple + i]; + fsstate->num_tuples = numrows; fsstate->next_tuple = 0; - for (i = 0; i < numrows; i++) + /* Convert the data into HeapTuples */ + for (i = 0 ; i < addrows; i++) { - fsstate->tuples[i] = + fsstate->tuples[restrows + i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, @@ -2066,6 +2165,23 @@ fetch_more_data(ForeignScanState *node) PQclear(res); res = NULL; + + if (cmd == ALLOW_ASYNC) + { + if (!fsstate->eof_reached) + { + /* + * We can immediately request the next bunch of tuples if + * we're on asynchronous connection. + */ + if (!PFCsendQuery(conn, sql)) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + PFCsetAsyncScan(conn, fsstate); + } + } + +end_of_fetch: + ; /* Nothing to do here but needed to make compiler quiet. */ } PG_CATCH(); { @@ -2079,6 +2195,28 @@ fetch_more_data(ForeignScanState *node) } /* + * Force cancelling async command state. + */ +void +finish_async_query(PgFdwConn *conn) +{ + PgFdwScanState *fsstate = PFCgetAsyncScan(conn); + PgFdwConn *async_conn; + + /* Nothing to do if no async connection */ + if (fsstate == NULL) return; + async_conn = fsstate->conn; + if (!async_conn || + PFCgetNscans(async_conn) == 1 || + !PFCisAsyncRunning(async_conn)) + return; + + fetch_more_data(PFCgetAsyncScan(async_conn), FORCE_SYNC); + + Assert(!PFCisAsyncRunning(async_conn)); +} + +/* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * @@ -2132,7 +2270,7 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PgFdwConn *conn, unsigned int cursor_number) { char sql[64]; PGresult *res; @@ -2143,7 +2281,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(conn, sql); + res = PFCexec(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -2165,6 +2303,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems @@ -2175,11 +2316,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQprepare(fmstate->conn, - p_name, - fmstate->query, - 0, - NULL); + res = PFCprepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -2297,7 +2438,7 @@ postgresAnalyzeForeignTable(Relation relation, ForeignTable *table; ForeignServer *server; UserMapping *user; - PGconn *conn; + PgFdwConn *conn; StringInfoData sql; PGresult *volatile res = NULL; @@ -2329,7 +2470,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = PFCexec(conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2379,7 +2520,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, ForeignTable *table; ForeignServer *server; UserMapping *user; - PGconn *conn; + PgFdwConn *conn; unsigned int cursor_number; StringInfoData sql; PGresult *volatile res = NULL; @@ -2423,7 +2564,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = PFCexec(conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -2453,7 +2594,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); - res = PQexec(conn, fetch_sql); + res = PFCexec(conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2582,7 +2723,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) bool import_not_null = true; ForeignServer *server; UserMapping *mapping; - PGconn *conn; + PgFdwConn *conn; StringInfoData buf; PGresult *volatile res = NULL; int numrows, @@ -2615,7 +2756,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) conn = GetConnection(server, mapping, false); /* Don't attempt to import collation if remote server hasn't got it */ - if (PQserverVersion(conn) < 90100) + if (PFCserverVersion(conn) < 90100) import_collate = false; /* Create workspace for strings */ @@ -2628,7 +2769,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = PQexec(conn, buf.data); + res = PFCexec(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -2723,7 +2864,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfo(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = PQexec(conn, buf.data); + res = PFCexec(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 950c6f7..b117a88 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -18,19 +18,22 @@ #include "nodes/relation.h" #include "utils/relcache.h" -#include "libpq-fe.h" +#include "PgFdwConn.h" + +struct PgFdwScanState; /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); +extern void finish_async_query(PgFdwConn *fsstate); /* in connection.c */ -extern PGconn *GetConnection(ForeignServer *server, UserMapping *user, +extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user, bool will_prep_stmt); -extern void ReleaseConnection(PGconn *conn); -extern unsigned int GetCursorNumber(PGconn *conn); -extern unsigned int GetPrepStmtNumber(PGconn *conn); -extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, +extern void ReleaseConnection(PgFdwConn *conn); +extern unsigned int GetCursorNumber(PgFdwConn *conn); +extern unsigned int GetPrepStmtNumber(PgFdwConn *conn); +extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, bool clear, const char *sql); /* in option.c */ -- 2.1.0.GIT
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers