Hello, this is the 2nd session of 'intoroducing parallelism using postgres_fdw'.
The two patch attached are as following, - 0001-Async-exec-of-postgres_fdw.patch Main patch, which includes all functions. - 0002-rename-PGConn-variable.patch Renaming the variable conn for readability. No functional effect. * Outline of this patch >From some consideration after the previous discussion and comments from others, I judged the original (WIP) patch was overdone as the first step. So I reduced the patch to minimal function. The new patch does the following, - Wrapping PGconn by PgFdwConn in order to handle multiple scans on one connection. - The core async logic was added in fetch_more_data(). - Invoking remote commands asynchronously in ExecInitForeignScan. - Canceling async invocation if any other foreign scans, modifies, deletes use the same connection. Cancellation is done by immediately fetching the return of already-invoked acync command. * Where this patch will be effective. With upcoming inheritance-partition feature, this patch enables stating and running foreign scans asynchronously. It will be more effective for longer TAT or remote startup times, and larger number of foreign servers. No negative performance effect on other situations. * Concerns about this patch. - This breaks the assumption that scan starts at ExecForeignScan, not ExecInitForeignScan, which might cause some problem. - error reporting code in do_sql_command is quite ugly.. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From 4b56fcd0687172e3cccb329bc17e78935657f58f Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 28 Nov 2014 10:52:41 +0900 Subject: [PATCH 1/2] Async exec of postgres_fdw. --- contrib/postgres_fdw/connection.c | 102 ++++++++++++------- contrib/postgres_fdw/postgres_fdw.c | 191 ++++++++++++++++++++++++++++-------- contrib/postgres_fdw/postgres_fdw.h | 28 +++++- 3 files changed, 242 insertions(+), 79 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 116be7d..8b1c738 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -44,7 +44,7 @@ typedef struct ConnCacheKey typedef struct ConnCacheEntry { ConnCacheKey key; /* hash key (must be first) */ - PGconn *conn; /* connection to foreign server, or NULL */ + PgFdwConn *conn; /* connection to foreign server, or NULL */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ @@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, * be useful and not mere pedantry. We could not flush any active connections * mid-transaction anyway. */ -PGconn * +PgFdwConn * GetConnection(ForeignServer *server, UserMapping *user, bool will_prep_stmt) { @@ -160,16 +160,36 @@ GetConnection(ForeignServer *server, UserMapping *user, entry->xact_depth = 0; /* just to be sure */ entry->have_prep_stmt = false; entry->have_error = false; - entry->conn = connect_pg_server(server, user); + + /* This shoud be in the same memory context with the hashtable */ + entry->conn = + (PgFdwConn *) MemoryContextAllocZero(CacheMemoryContext, + sizeof(PgFdwConn)); + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", - entry->conn, server->servername); + entry->conn->conn, server->servername); } + if (entry->conn->conn == NULL) + { + entry->conn->conn = connect_pg_server(server, user); + entry->conn->nscans = 0; + entry->conn->async_state = PGFDW_CONN_IDLE; + entry->conn->async_scan = NULL; + } /* * Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); + /* + * Cancel async query if there's another foreign scan node sharing this + * connection. + */ + if (++entry->conn->nscans > 1 && + entry->conn->async_state == PGFDW_CONN_ASYNC_RUNNING) + fetch_more_data(entry->conn->async_scan); + /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; @@ -182,7 +202,7 @@ GetConnection(ForeignServer *server, UserMapping *user, static PGconn * connect_pg_server(ForeignServer *server, UserMapping *user) { - PGconn *volatile conn = NULL; + PGconn *volatile conn = NULL; /* * Use PG_TRY block to ensure closing connection on error. @@ -355,7 +375,12 @@ do_sql_command(PGconn *conn, const char *sql) res = PQexec(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, sql); + { + PgFdwConn tmpfdwconn; + + tmpfdwconn.conn = conn; + pgfdw_report_error(ERROR, res, &tmpfdwconn, true, sql); + } PQclear(res); } @@ -380,13 +405,13 @@ begin_remote_xact(ConnCacheEntry *entry) const char *sql; elog(DEBUG3, "starting remote transaction on connection %p", - entry->conn); + &entry->conn); if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn->conn, sql); entry->xact_depth = 1; } @@ -400,7 +425,7 @@ begin_remote_xact(ConnCacheEntry *entry) char sql[64]; snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn->conn, sql); entry->xact_depth++; } } @@ -409,13 +434,13 @@ begin_remote_xact(ConnCacheEntry *entry) * Release connection reference count created by calling GetConnection. */ void -ReleaseConnection(PGconn *conn) +ReleaseConnection(PgFdwConn *conn) { - /* - * Currently, we don't actually track connection references because all - * cleanup is managed on a transaction or subtransaction basis instead. So - * there's nothing to do here. - */ + if (--conn->nscans == 0) + { + if (conn->async_scan) + finish_async_connection(conn->async_scan); + } } /* @@ -430,7 +455,7 @@ ReleaseConnection(PGconn *conn) * collisions are highly improbable; just be sure to use %u not %d to print. */ unsigned int -GetCursorNumber(PGconn *conn) +GetCursorNumber(PgFdwConn *conn) { return ++cursor_number; } @@ -444,7 +469,7 @@ GetCursorNumber(PGconn *conn) * increasing the risk of prepared-statement name collisions by resetting. */ unsigned int -GetPrepStmtNumber(PGconn *conn) +GetPrepStmtNumber(PgFdwConn *conn) { return ++prep_stmt_number; } @@ -463,7 +488,7 @@ GetPrepStmtNumber(PGconn *conn) * marked with have_error = true. */ void -pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, +pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, bool clear, const char *sql) { /* If requested, PGresult must be released before leaving this function. */ @@ -491,7 +516,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, * return NULL, not a PGresult at all. */ if (message_primary == NULL) - message_primary = PQerrorMessage(conn); + message_primary = PQerrorMessage(conn->conn); ereport(elevel, (errcode(sqlstate), @@ -536,20 +561,20 @@ pgfdw_xact_callback(XactEvent event, void *arg) PGresult *res; /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) + if (entry->conn->conn == NULL) continue; /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + entry->conn->conn); switch (event) { case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ - do_sql_command(entry->conn, "COMMIT TRANSACTION"); + do_sql_command(entry->conn->conn, "COMMIT TRANSACTION"); /* * If there were any errors in subtransactions, and we @@ -568,7 +593,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = PQexec(entry->conn->conn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -598,7 +623,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Assume we might have lost track of prepared statements */ entry->have_error = true; /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn, "ABORT TRANSACTION"); + res = PQexec(entry->conn->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, @@ -609,7 +634,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* As above, make sure to clear any prepared stmts */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = PQexec(entry->conn->conn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -621,17 +646,19 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; - + entry->conn->nscans = 0; + entry->conn->async_state = PGFDW_CONN_IDLE; + entry->conn->async_scan = NULL; /* * If the connection isn't in a good idle state, discard it to * recover. Next GetConnection will open a new connection. */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + if (PQstatus(entry->conn->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn->conn) != PQTRANS_IDLE) { - elog(DEBUG3, "discarding connection %p", entry->conn); - PQfinish(entry->conn); - entry->conn = NULL; + elog(DEBUG3, "discarding connection %p", entry->conn->conn); + PQfinish(entry->conn->conn); + entry->conn->conn = NULL; } } @@ -677,11 +704,18 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, PGresult *res; char sql[100]; + /* Shut down asynchronous scan if running */ + if (entry->conn->async_scan && PQisBusy(entry->conn->conn)) + PQconsumeInput(entry->conn->conn); + entry->conn->async_scan = NULL; + entry->conn->async_state = PGFDW_CONN_IDLE; + entry->conn->nscans = 0; + /* * We only care about connections with open remote subtransactions of * the current level. */ - if (entry->conn == NULL || entry->xact_depth < curlevel) + if (entry->conn->conn == NULL || entry->xact_depth < curlevel) continue; if (entry->xact_depth > curlevel) @@ -692,7 +726,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, { /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn->conn, sql); } else { @@ -702,7 +736,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, snprintf(sql, sizeof(sql), "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", curlevel, curlevel); - res = PQexec(entry->conn, sql); + res = PQexec(entry->conn->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, sql); else diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index c3039a6..b912091 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -136,7 +136,7 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + PgFdwConn *conn; /* connection for the scan */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -156,6 +156,7 @@ typedef struct PgFdwScanState /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ + ExprContext *econtext; /* copy of ps_ExprContext of ForeignScanState */ } PgFdwScanState; /* @@ -167,7 +168,7 @@ typedef struct PgFdwModifyState AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + PgFdwConn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -298,7 +299,7 @@ static void estimate_path_cost_size(PlannerInfo *root, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost); static void get_remote_estimate(const char *sql, - PGconn *conn, + PgFdwConn *conn, double *rows, int *width, Cost *startup_cost, @@ -306,9 +307,8 @@ static void get_remote_estimate(const char *sql, static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); -static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void create_cursor(PgFdwScanState *node); +static void close_cursor(PgFdwConn *conn, unsigned int cursor_number); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, @@ -329,7 +329,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, MemoryContext temp_context); static void conversion_error_callback(void *arg); - /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. @@ -982,6 +981,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); else fsstate->param_values = NULL; + + fsstate->econtext = node->ss.ps.ps_ExprContext; + + /* + * Start scanning asynchronously if it is the first scan on this + * connection. + */ + if (fsstate->conn->nscans == 1) + create_cursor(fsstate); } /* @@ -1000,7 +1008,7 @@ postgresIterateForeignScan(ForeignScanState *node) * cursor on the remote side. */ if (!fsstate->cursor_exists) - create_cursor(node); + create_cursor(fsstate); /* * Get some more tuples, if we've run out. @@ -1009,7 +1017,7 @@ postgresIterateForeignScan(ForeignScanState *node) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) - fetch_more_data(node); + fetch_more_data(fsstate); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); @@ -1069,7 +1077,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fsstate->conn, sql); + res = PQexec(fsstate->conn->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1398,7 +1406,7 @@ postgresExecForeignInsert(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, + res = PQexecPrepared(fmstate->conn->conn, fmstate->p_name, fmstate->p_nums, p_values, @@ -1468,7 +1476,7 @@ postgresExecForeignUpdate(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, + res = PQexecPrepared(fmstate->conn->conn, fmstate->p_name, fmstate->p_nums, p_values, @@ -1538,7 +1546,7 @@ postgresExecForeignDelete(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, + res = PQexecPrepared(fmstate->conn->conn, fmstate->p_name, fmstate->p_nums, p_values, @@ -1594,7 +1602,7 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fmstate->conn, sql); + res = PQexec(fmstate->conn->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -1726,7 +1734,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_join_conds; StringInfoData sql; List *retrieved_attrs; - PGconn *conn; + PgFdwConn *conn; Selectivity local_sel; QualCost local_cost; @@ -1836,7 +1844,7 @@ estimate_path_cost_size(PlannerInfo *root, * The given "sql" must be an EXPLAIN command. */ static void -get_remote_estimate(const char *sql, PGconn *conn, +get_remote_estimate(const char *sql, PgFdwConn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { @@ -1852,7 +1860,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = PQexec(conn, sql); + res = PQexec(conn->conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -1917,13 +1925,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, * Create cursor for node's query with current parameter values. */ static void -create_cursor(ForeignScanState *node) +create_cursor(PgFdwScanState *fsstate) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - ExprContext *econtext = node->ss.ps.ps_ExprContext; + ExprContext *econtext = fsstate->econtext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PgFdwConn *conn = fsstate->conn; StringInfoData buf; PGresult *res; @@ -1985,7 +1992,7 @@ create_cursor(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecParams(conn, buf.data, numParams, NULL, values, + res = PQexecParams(conn->conn, buf.data, numParams, NULL, values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); @@ -2001,15 +2008,18 @@ create_cursor(ForeignScanState *node) /* Clean up */ pfree(buf.data); + + /* Start async scan if this is the first scan */ + if (fsstate->conn->nscans == 1) + fetch_more_data(fsstate); } /* * Fetch some more rows from the node's cursor. */ -static void -fetch_more_data(ForeignScanState *node) +void +fetch_more_data(PgFdwScanState *fsstate) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; @@ -2024,7 +2034,7 @@ fetch_more_data(ForeignScanState *node) /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PgFdwConn *conn = fsstate->conn; char sql[64]; int fetch_size; int numrows; @@ -2036,9 +2046,63 @@ fetch_more_data(ForeignScanState *node) snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fetch_size, fsstate->cursor_number); - res = PQexec(conn, sql); + switch (conn->async_state) + { + case PGFDW_CONN_IDLE: + Assert(conn->async_scan == NULL); + + if (conn->nscans == 1) + { + conn->async_scan = fsstate; + + if (!PQsendQuery(conn->conn, sql)) + pgfdw_report_error(ERROR, res, conn, false, + fsstate->query); + + conn->async_state = PGFDW_CONN_ASYNC_RUNNING; + goto end_of_fetch; + } + + /* Synchronous query execution */ + conn->async_state = PGFDW_CONN_SYNC_RUNNING; + res = PQexec(conn->conn, sql); + break; + + case PGFDW_CONN_ASYNC_RUNNING: + Assert(conn->async_scan != NULL); + + res = PQgetResult(conn->conn); + if (PQntuples(res) == fetch_size) + { + /* + * Connection state doesn't go to IDLE even if all data + * has been sent to client for asynchronous query. One + * more PQgetResult() is needed to reset the state to + * IDLE. See PQexecFinish() for details. + */ + if (PQgetResult(conn->conn) != NULL) + elog(ERROR, "Connection status error."); + } + + if (conn->nscans == 1) + break; + + /* + * If nscans is more then 1, stop invoking command asynchronously + * for multiple scans on this connection. If nscan is zero, async + * command on this connection should be finished immediately. + */ + conn->async_state = PGFDW_CONN_SYNC_RUNNING; + break; + + default: + elog(ERROR, "unexpected async state : %d", conn->async_state); + break; + + } + /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) + if (res && PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); /* Convert the data into HeapTuples */ @@ -2066,6 +2130,36 @@ fetch_more_data(ForeignScanState *node) PQclear(res); res = NULL; + + switch(conn->async_state) + { + case PGFDW_CONN_ASYNC_RUNNING: + if (!fsstate->eof_reached) + { + /* + * We can immediately request the next bunch of tuples if + * we're on asynchronous connection. + */ + if (!PQsendQuery(conn->conn, sql)) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } + else + conn->async_state = PGFDW_CONN_IDLE; + break; + + + case PGFDW_CONN_SYNC_RUNNING: + conn->async_state = PGFDW_CONN_IDLE; + conn->async_scan = NULL; + break; + + default: + elog(ERROR, "Unexpedted async state: %d", conn->async_state); + break; + } + +end_of_fetch: + ; /* Nothing to do here but needed to make compiler quiet. */ } PG_CATCH(); { @@ -2079,6 +2173,23 @@ fetch_more_data(ForeignScanState *node) } /* + * Force cancelling async command state. + */ +void +finish_async_connection(PgFdwScanState *fsstate) +{ + /* Finish async command if any */ + if (fsstate->conn->async_state == PGFDW_CONN_ASYNC_RUNNING) + fetch_more_data(fsstate->conn->async_scan); + fsstate->conn->async_scan = NULL; + Assert(fsstate->conn->async_state == PGFDW_CONN_IDLE); + + /* Immediately discard the result */ + fsstate->next_tuple = 0; + fsstate->num_tuples = 0; +} + +/* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * @@ -2132,7 +2243,7 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PgFdwConn *conn, unsigned int cursor_number) { char sql[64]; PGresult *res; @@ -2143,7 +2254,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(conn, sql); + res = PQexec(conn->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -2175,7 +2286,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQprepare(fmstate->conn, + res = PQprepare(fmstate->conn->conn, p_name, fmstate->query, 0, @@ -2297,7 +2408,7 @@ postgresAnalyzeForeignTable(Relation relation, ForeignTable *table; ForeignServer *server; UserMapping *user; - PGconn *conn; + PgFdwConn *conn; StringInfoData sql; PGresult *volatile res = NULL; @@ -2329,7 +2440,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = PQexec(conn->conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2379,7 +2490,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, ForeignTable *table; ForeignServer *server; UserMapping *user; - PGconn *conn; + PgFdwConn *conn; unsigned int cursor_number; StringInfoData sql; PGresult *volatile res = NULL; @@ -2423,7 +2534,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = PQexec(conn->conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -2453,7 +2564,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); - res = PQexec(conn, fetch_sql); + res = PQexec(conn->conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2582,7 +2693,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) bool import_not_null = true; ForeignServer *server; UserMapping *mapping; - PGconn *conn; + PgFdwConn *conn; StringInfoData buf; PGresult *volatile res = NULL; int numrows, @@ -2615,7 +2726,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) conn = GetConnection(server, mapping, false); /* Don't attempt to import collation if remote server hasn't got it */ - if (PQserverVersion(conn) < 90100) + if (PQserverVersion(conn->conn) < 90100) import_collate = false; /* Create workspace for strings */ @@ -2628,7 +2739,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = PQexec(conn, buf.data); + res = PQexec(conn->conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -2723,7 +2834,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfo(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = PQexec(conn, buf.data); + res = PQexec(conn->conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 0382c55..2472451 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -20,17 +20,35 @@ #include "libpq-fe.h" +typedef enum PgFdwConnState +{ + PGFDW_CONN_IDLE, + PGFDW_CONN_ASYNC_RUNNING, + PGFDW_CONN_SYNC_RUNNING +} PgFdwConnState; + +typedef struct PgFdwConn +{ + PGconn *conn; + int nscans; + PgFdwConnState async_state; + struct PgFdwScanState *async_scan; +} PgFdwConn; + + /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); +extern void fetch_more_data(struct PgFdwScanState *node); +extern void finish_async_connection(struct PgFdwScanState *fsstate); /* in connection.c */ -extern PGconn *GetConnection(ForeignServer *server, UserMapping *user, +extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user, bool will_prep_stmt); -extern void ReleaseConnection(PGconn *conn); -extern unsigned int GetCursorNumber(PGconn *conn); -extern unsigned int GetPrepStmtNumber(PGconn *conn); -extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, +extern void ReleaseConnection(PgFdwConn *conn); +extern unsigned int GetCursorNumber(PgFdwConn *conn); +extern unsigned int GetPrepStmtNumber(PgFdwConn *conn); +extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, bool clear, const char *sql); /* in option.c */ -- 2.1.0.GIT
>From 2d76622c655294b2e6b54fab606ab9c1501c17f0 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 4 Dec 2014 16:48:22 +0900 Subject: [PATCH 2/2] rename PGConn variable --- contrib/postgres_fdw/connection.c | 46 ++++++++++++++++++------------------- contrib/postgres_fdw/postgres_fdw.c | 40 ++++++++++++++++---------------- contrib/postgres_fdw/postgres_fdw.h | 2 +- 3 files changed, 44 insertions(+), 44 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8b1c738..3d5c8dc 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -167,12 +167,12 @@ GetConnection(ForeignServer *server, UserMapping *user, sizeof(PgFdwConn)); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", - entry->conn->conn, server->servername); + entry->conn->pgconn, server->servername); } - if (entry->conn->conn == NULL) + if (entry->conn->pgconn == NULL) { - entry->conn->conn = connect_pg_server(server, user); + entry->conn->pgconn = connect_pg_server(server, user); entry->conn->nscans = 0; entry->conn->async_state = PGFDW_CONN_IDLE; entry->conn->async_scan = NULL; @@ -378,7 +378,7 @@ do_sql_command(PGconn *conn, const char *sql) { PgFdwConn tmpfdwconn; - tmpfdwconn.conn = conn; + tmpfdwconn.pgconn = conn; pgfdw_report_error(ERROR, res, &tmpfdwconn, true, sql); } PQclear(res); @@ -411,7 +411,7 @@ begin_remote_xact(ConnCacheEntry *entry) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; - do_sql_command(entry->conn->conn, sql); + do_sql_command(entry->conn->pgconn, sql); entry->xact_depth = 1; } @@ -425,7 +425,7 @@ begin_remote_xact(ConnCacheEntry *entry) char sql[64]; snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); - do_sql_command(entry->conn->conn, sql); + do_sql_command(entry->conn->pgconn, sql); entry->xact_depth++; } } @@ -516,7 +516,7 @@ pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, * return NULL, not a PGresult at all. */ if (message_primary == NULL) - message_primary = PQerrorMessage(conn->conn); + message_primary = PQerrorMessage(conn->pgconn); ereport(elevel, (errcode(sqlstate), @@ -561,20 +561,20 @@ pgfdw_xact_callback(XactEvent event, void *arg) PGresult *res; /* Ignore cache entry if no open connection right now */ - if (entry->conn->conn == NULL) + if (entry->conn->pgconn == NULL) continue; /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn->conn); + entry->conn->pgconn); switch (event) { case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ - do_sql_command(entry->conn->conn, "COMMIT TRANSACTION"); + do_sql_command(entry->conn->pgconn, "COMMIT TRANSACTION"); /* * If there were any errors in subtransactions, and we @@ -593,7 +593,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn->conn, "DEALLOCATE ALL"); + res = PQexec(entry->conn->pgconn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -623,7 +623,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Assume we might have lost track of prepared statements */ entry->have_error = true; /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn->conn, "ABORT TRANSACTION"); + res = PQexec(entry->conn->pgconn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, @@ -634,7 +634,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* As above, make sure to clear any prepared stmts */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn->conn, "DEALLOCATE ALL"); + res = PQexec(entry->conn->pgconn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -653,12 +653,12 @@ pgfdw_xact_callback(XactEvent event, void *arg) * If the connection isn't in a good idle state, discard it to * recover. Next GetConnection will open a new connection. */ - if (PQstatus(entry->conn->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn->conn) != PQTRANS_IDLE) + if (PQstatus(entry->conn->pgconn) != CONNECTION_OK || + PQtransactionStatus(entry->conn->pgconn) != PQTRANS_IDLE) { - elog(DEBUG3, "discarding connection %p", entry->conn->conn); - PQfinish(entry->conn->conn); - entry->conn->conn = NULL; + elog(DEBUG3, "discarding connection %p", entry->conn->pgconn); + PQfinish(entry->conn->pgconn); + entry->conn->pgconn = NULL; } } @@ -705,8 +705,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, char sql[100]; /* Shut down asynchronous scan if running */ - if (entry->conn->async_scan && PQisBusy(entry->conn->conn)) - PQconsumeInput(entry->conn->conn); + if (entry->conn->async_scan && PQisBusy(entry->conn->pgconn)) + PQconsumeInput(entry->conn->pgconn); entry->conn->async_scan = NULL; entry->conn->async_state = PGFDW_CONN_IDLE; entry->conn->nscans = 0; @@ -715,7 +715,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, * We only care about connections with open remote subtransactions of * the current level. */ - if (entry->conn->conn == NULL || entry->xact_depth < curlevel) + if (entry->conn->pgconn == NULL || entry->xact_depth < curlevel) continue; if (entry->xact_depth > curlevel) @@ -726,7 +726,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, { /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); - do_sql_command(entry->conn->conn, sql); + do_sql_command(entry->conn->pgconn, sql); } else { @@ -736,7 +736,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, snprintf(sql, sizeof(sql), "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", curlevel, curlevel); - res = PQexec(entry->conn->conn, sql); + res = PQexec(entry->conn->pgconn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, sql); else diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index b912091..e82ec82 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -1077,7 +1077,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fsstate->conn->conn, sql); + res = PQexec(fsstate->conn->pgconn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1406,7 +1406,7 @@ postgresExecForeignInsert(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn->conn, + res = PQexecPrepared(fmstate->conn->pgconn, fmstate->p_name, fmstate->p_nums, p_values, @@ -1476,7 +1476,7 @@ postgresExecForeignUpdate(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn->conn, + res = PQexecPrepared(fmstate->conn->pgconn, fmstate->p_name, fmstate->p_nums, p_values, @@ -1546,7 +1546,7 @@ postgresExecForeignDelete(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn->conn, + res = PQexecPrepared(fmstate->conn->pgconn, fmstate->p_name, fmstate->p_nums, p_values, @@ -1602,7 +1602,7 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fmstate->conn->conn, sql); + res = PQexec(fmstate->conn->pgconn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -1860,7 +1860,7 @@ get_remote_estimate(const char *sql, PgFdwConn *conn, /* * Execute EXPLAIN remotely. */ - res = PQexec(conn->conn, sql); + res = PQexec(conn->pgconn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -1992,7 +1992,7 @@ create_cursor(PgFdwScanState *fsstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecParams(conn->conn, buf.data, numParams, NULL, values, + res = PQexecParams(conn->pgconn, buf.data, numParams, NULL, values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); @@ -2055,7 +2055,7 @@ fetch_more_data(PgFdwScanState *fsstate) { conn->async_scan = fsstate; - if (!PQsendQuery(conn->conn, sql)) + if (!PQsendQuery(conn->pgconn, sql)) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); @@ -2065,13 +2065,13 @@ fetch_more_data(PgFdwScanState *fsstate) /* Synchronous query execution */ conn->async_state = PGFDW_CONN_SYNC_RUNNING; - res = PQexec(conn->conn, sql); + res = PQexec(conn->pgconn, sql); break; case PGFDW_CONN_ASYNC_RUNNING: Assert(conn->async_scan != NULL); - res = PQgetResult(conn->conn); + res = PQgetResult(conn->pgconn); if (PQntuples(res) == fetch_size) { /* @@ -2080,7 +2080,7 @@ fetch_more_data(PgFdwScanState *fsstate) * more PQgetResult() is needed to reset the state to * IDLE. See PQexecFinish() for details. */ - if (PQgetResult(conn->conn) != NULL) + if (PQgetResult(conn->pgconn) != NULL) elog(ERROR, "Connection status error."); } @@ -2140,7 +2140,7 @@ fetch_more_data(PgFdwScanState *fsstate) * We can immediately request the next bunch of tuples if * we're on asynchronous connection. */ - if (!PQsendQuery(conn->conn, sql)) + if (!PQsendQuery(conn->pgconn, sql)) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); } else @@ -2254,7 +2254,7 @@ close_cursor(PgFdwConn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(conn->conn, sql); + res = PQexec(conn->pgconn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -2286,7 +2286,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQprepare(fmstate->conn->conn, + res = PQprepare(fmstate->conn->pgconn, p_name, fmstate->query, 0, @@ -2440,7 +2440,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn->conn, sql.data); + res = PQexec(conn->pgconn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2534,7 +2534,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn->conn, sql.data); + res = PQexec(conn->pgconn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -2564,7 +2564,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); - res = PQexec(conn->conn, fetch_sql); + res = PQexec(conn->pgconn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2726,7 +2726,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) conn = GetConnection(server, mapping, false); /* Don't attempt to import collation if remote server hasn't got it */ - if (PQserverVersion(conn->conn) < 90100) + if (PQserverVersion(conn->pgconn) < 90100) import_collate = false; /* Create workspace for strings */ @@ -2739,7 +2739,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = PQexec(conn->conn, buf.data); + res = PQexec(conn->pgconn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -2834,7 +2834,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfo(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = PQexec(conn->conn, buf.data); + res = PQexec(conn->pgconn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 2472451..5dfc04a 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -29,7 +29,7 @@ typedef enum PgFdwConnState typedef struct PgFdwConn { - PGconn *conn; + PGconn *pgconn; int nscans; PgFdwConnState async_state; struct PgFdwScanState *async_scan; -- 2.1.0.GIT
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers