Hello. This is the new version of FDW async exection feature. The status of this feature is as follows, as of the last commitfest.
- Async execution is valuable to have. - But do the first kick in ExecInit phase is wrong. So the design outline of this version is as following, - The patch set consists of three parts. The fist is the infrastracture in core-side, second is the code to enable asynchronous execution of Postgres-FDW. The third part is the alternative set of three methods to adapt fetch size, which makes asynchronous execution more effective. - It was a problem when to give the first kick for async exec. It is not in ExecInit phase, and ExecProc phase does not fit, too. An extra phase ExecPreProc or something is too invasive. So I tried "pre-exec callback". Any init-node can register callbacks on their turn, then the registerd callbacks are called just before ExecProc phase in executor. The first patch adds functions and structs to enable this. - The second part is not changed from the previous version. Add PgFdwConn as a extended PgConn which have some members to support asynchronous execution. The asynchronous execution is kicked only for the first ForeignScan node on the same foreign server. And the state lasts until the next scan comes. This behavior is mainly controlled in fetch_more_data(). The behavior limits the number of simultaneous exection for one foreign server to 1. This behavior is decided from the reason that no reasonable method to limit multiplicity of execution on *single peer* was found so far. - The third part is three kind of trials of adaptive fetch size feature. The first one is duration-based adaptation. The patch increases the fetch size by every FETCH execution but try to keep the duration of every FETCH below 500 ms. But it is not promising because it looks very unstable, or the behavior is nearly unforeseeable.. The second one is based on byte-based FETCH feature. This patch adds to FETCH command an argument to limit the number of bytes (octets) to send. But this might be a over-exposure of the internals. The size is counted based on internal representation of a tuple and the client is needed to send the overhead of its internal tuple representation in bytes. This is effective but quite ugly.. The third is the most simple and straight-forward way, that is, adds a foreign table option to specify the fetch_size. The effect of this is also in doubt since the size of tuples for one foreign table would vary according to the return-columns list. But it is foreseeable for users and is a necessary knob for those who want to tune it. Foreign server also could have the same option as the default for that for foreign tables but this patch have not added it. The attached patches are the following, - 0001-Add-infrastructure-of-pre-execution-callbacks.patch Infrastructure of pre-execution callback - 0002-Allow-asynchronous-remote-query-of-postgres_fdw.patch FDW asynchronous execution feature - 0003a-Add-experimental-POC-adaptive-fetch-size-feature.patch Adaptive fetch size alternative 1: duration based control - 0003b-POC-Experimental-fetch_by_size-feature.patch Adaptive fetch size alternative 2: FETCH by size - 0003c-Add-foreign-table-option-to-set-fetch-size.patch Adaptive fetch size alternative 3: Foreign table option. regards,
>From eb621897d1410079c6458bc4d1914d1345eb77bc Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 26 Jun 2015 15:12:16 +0900 Subject: [PATCH 1/3] Add infrastructure of pre-execution callbacks. Some exec nodes have some work before plan tree execution. This infrastructure provides such functionality --- src/backend/executor/execMain.c | 32 ++++++++++++++++++++++++++++++++ src/backend/executor/execUtils.c | 2 ++ src/include/nodes/execnodes.h | 22 ++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index a1561ce..51a86b2 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -764,6 +764,35 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt)); } +/* + * Register callbacks to be called just before plan execution. + */ +void +RegisterPreExecCallback(PreExecCallback callback, EState *es, Node *nd, + void *arg) +{ + PreExecCallbackItem *item; + + item = (PreExecCallbackItem*) + MemoryContextAlloc(es->es_query_cxt, sizeof(PreExecCallbackItem)); + item->callback = callback; + item->node = nd; + item->arg = arg; + + /* add the new node at the end of the chain */ + item->next = es->es_preExecCallbacks; + es->es_preExecCallbacks = item; +} + +/* Execute registered pre-exec callbacks */ +void +RunPreExecCallbacks(EState *es) +{ + PreExecCallbackItem *item; + + for (item = es->es_preExecCallbacks ; item ; item = item->next) + item->callback(es, item->node); +} /* ---------------------------------------------------------------- * InitPlan @@ -956,6 +985,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) */ planstate = ExecInitNode(plan, estate, eflags); + /* Execute pre-execution callbacks registered during ExecInitNode */ + RunPreExecCallbacks(estate); + /* * Get the tuple descriptor describing the type of tuples to return. */ diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 3c611b9..e80bc22 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -123,6 +123,8 @@ CreateExecutorState(void) estate->es_rowMarks = NIL; + estate->es_preExecCallbacks = NULL; + estate->es_processed = 0; estate->es_lastoid = InvalidOid; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 541ee18..cb8d854 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -343,6 +343,26 @@ typedef struct ResultRelInfo List *ri_onConflictSetWhere; } ResultRelInfo; +struct EState; + +/* ---------------- + * Pre-execute callbacks + * ---------------- + */ +typedef void (*PreExecCallback) (struct EState *estate, Node *node); +typedef struct PreExecCallbackItem +{ + struct PreExecCallbackItem *next; + PreExecCallback callback; /* function to call just before execution + * starts */ + Node *node; /* node to process */ + void *arg; /* any extra arguments */ +} PreExecCallbackItem; + +void RegisterPreExecCallback(PreExecCallback callback, struct EState *es, + Node *nd, void *arg); +void RunPreExecCallbacks(struct EState *es); + /* ---------------- * EState information * @@ -387,6 +407,8 @@ typedef struct EState List *es_rowMarks; /* List of ExecRowMarks */ + PreExecCallbackItem *es_preExecCallbacks; /* pre-exec callbacks */ + uint32 es_processed; /* # of tuples processed */ Oid es_lastoid; /* last oid processed (by INSERT) */ -- 1.8.3.1
>From 8db5a4992cf0509b6c9f93d659a3ba3644f30fa9 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 26 Jun 2015 16:54:39 +0900 Subject: [PATCH 2/3] Allow asynchronous remote query of postgres_fdw. The new type PgFdwConn makes connection.c to be aware of running of asynchronous query. The first node on one connection is invoked prior to ExecProcNode. In addition to that, fetch_more_data() tries to keep asynchronous fetching as long as no other query starts to run on the same connection using the async-aware function of PgFdwConn. --- contrib/postgres_fdw/Makefile | 2 +- contrib/postgres_fdw/PgFdwConn.c | 200 ++++++++++++++++++++++++ contrib/postgres_fdw/PgFdwConn.h | 61 ++++++++ contrib/postgres_fdw/connection.c | 81 +++++----- contrib/postgres_fdw/postgres_fdw.c | 294 +++++++++++++++++++++++++++--------- contrib/postgres_fdw/postgres_fdw.h | 15 +- 6 files changed, 538 insertions(+), 115 deletions(-) create mode 100644 contrib/postgres_fdw/PgFdwConn.c create mode 100644 contrib/postgres_fdw/PgFdwConn.h diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index d2b98e1..d0913e2 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -1,7 +1,7 @@ # contrib/postgres_fdw/Makefile MODULE_big = postgres_fdw -OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES) +OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES) PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL" PG_CPPFLAGS = -I$(libpq_srcdir) diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c new file mode 100644 index 0000000..b13b597 --- /dev/null +++ b/contrib/postgres_fdw/PgFdwConn.c @@ -0,0 +1,200 @@ +/*------------------------------------------------------------------------- + * + * PgFdwConn.c + * PGconn extending wrapper to enable asynchronous query. + * + * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgres_fdw/PgFdwConn.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "PgFdwConn.h" + +#define PFC_ALLOCATE() ((PgFdwConn *)malloc(sizeof(PgFdwConn))) +#define PFC_FREE(c) free(c) + +struct pgfdw_conn +{ + PGconn *pgconn; /* libpq connection for this connection */ + int nscans; /* number of scans using this connection */ + struct PgFdwScanState *async_scan; /* the connection currently running + * async query on this connection */ +}; + +void +PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan) +{ + conn->async_scan = scan; +} + +struct PgFdwScanState * +PFCgetAsyncScan(PgFdwConn *conn) +{ + return conn->async_scan; +} + +int +PFCisAsyncRunning(PgFdwConn *conn) +{ + return conn->async_scan != NULL; +} + +PGconn * +PFCgetPGconn(PgFdwConn *conn) +{ + return conn->pgconn; +} + +int +PFCgetNscans(PgFdwConn *conn) +{ + return conn->nscans; +} + +int +PFCincrementNscans(PgFdwConn *conn) +{ + return ++conn->nscans; +} + +int +PFCdecrementNscans(PgFdwConn *conn) +{ + Assert(conn->nscans > 0); + return --conn->nscans; +} + +void +PFCcancelAsync(PgFdwConn *conn) +{ + if (PFCisAsyncRunning(conn)) + PFCconsumeInput(conn); +} + +void +PFCinit(PgFdwConn *conn) +{ + conn->async_scan = NULL; + conn->nscans = 0; +} + +int +PFCsendQuery(PgFdwConn *conn, const char *query) +{ + return PQsendQuery(conn->pgconn, query); +} + +PGresult * +PFCexec(PgFdwConn *conn, const char *query) +{ + return PQexec(conn->pgconn, query); +} + +PGresult * +PFCexecParams(PgFdwConn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) +{ + return PQexecParams(conn->pgconn, + command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat); +} + +PGresult * +PFCprepare(PgFdwConn *conn, + const char *stmtName, const char *query, + int nParams, const Oid *paramTypes) +{ + return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes); +} + +PGresult * +PFCexecPrepared(PgFdwConn *conn, + const char *stmtName, + int nParams, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) +{ + return PQexecPrepared(conn->pgconn, + stmtName, nParams, paramValues, paramLengths, + paramFormats, resultFormat); +} + +PGresult * +PFCgetResult(PgFdwConn *conn) +{ + return PQgetResult(conn->pgconn); +} + +int +PFCconsumeInput(PgFdwConn *conn) +{ + return PQconsumeInput(conn->pgconn); +} + +int +PFCisBusy(PgFdwConn *conn) +{ + return PQisBusy(conn->pgconn); +} + +ConnStatusType +PFCstatus(const PgFdwConn *conn) +{ + return PQstatus(conn->pgconn); +} + +PGTransactionStatusType +PFCtransactionStatus(const PgFdwConn *conn) +{ + return PQtransactionStatus(conn->pgconn); +} + +int +PFCserverVersion(const PgFdwConn *conn) +{ + return PQserverVersion(conn->pgconn); +} + +char * +PFCerrorMessage(const PgFdwConn *conn) +{ + return PQerrorMessage(conn->pgconn); +} + +int +PFCconnectionUsedPassword(const PgFdwConn *conn) +{ + return PQconnectionUsedPassword(conn->pgconn); +} + +void +PFCfinish(PgFdwConn *conn) +{ + return PQfinish(conn->pgconn); + PFC_FREE(conn); +} + +PgFdwConn * +PFCconnectdbParams(const char *const * keywords, + const char *const * values, int expand_dbname) +{ + PgFdwConn *ret = PFC_ALLOCATE(); + + PFCinit(ret); + ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname); + + return ret; +} diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h new file mode 100644 index 0000000..f695f5a --- /dev/null +++ b/contrib/postgres_fdw/PgFdwConn.h @@ -0,0 +1,61 @@ +/*------------------------------------------------------------------------- + * + * PgFdwConn.h + * PGconn extending wrapper to enable asynchronous query. + * + * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgres_fdw/PgFdwConn.h + * + *------------------------------------------------------------------------- + */ +#ifndef PGFDWCONN_H +#define PGFDWCONN_H + +#include "libpq-fe.h" + +typedef struct pgfdw_conn PgFdwConn; +struct PgFdwScanState; + +extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan); +extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn); +extern int PFCisAsyncRunning(PgFdwConn *conn); +extern PGconn *PFCgetPGconn(PgFdwConn *conn); +extern int PFCgetNscans(PgFdwConn *conn); +extern int PFCincrementNscans(PgFdwConn *conn); +extern int PFCdecrementNscans(PgFdwConn *conn); +extern void PFCcancelAsync(PgFdwConn *conn); +extern void PFCinit(PgFdwConn *conn); +extern int PFCsendQuery(PgFdwConn *conn, const char *query); +extern PGresult *PFCexec(PgFdwConn *conn, const char *query); +extern PGresult *PFCexecParams(PgFdwConn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat); +extern PGresult *PFCprepare(PgFdwConn *conn, + const char *stmtName, const char *query, + int nParams, const Oid *paramTypes); +extern PGresult *PFCexecPrepared(PgFdwConn *conn, + const char *stmtName, + int nParams, + const char *const * paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat); +extern PGresult *PFCgetResult(PgFdwConn *conn); +extern int PFCconsumeInput(PgFdwConn *conn); +extern int PFCisBusy(PgFdwConn *conn); +extern ConnStatusType PFCstatus(const PgFdwConn *conn); +extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn); +extern int PFCserverVersion(const PgFdwConn *conn); +extern char *PFCerrorMessage(const PgFdwConn *conn); +extern int PFCconnectionUsedPassword(const PgFdwConn *conn); +extern void PFCfinish(PgFdwConn *conn); +extern PgFdwConn *PFCconnectdbParams(const char *const * keywords, + const char *const * values, int expand_dbname); +#endif /* PGFDWCONN_H */ diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 1a1e5b5..790b675 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -44,7 +44,7 @@ typedef struct ConnCacheKey typedef struct ConnCacheEntry { ConnCacheKey key; /* hash key (must be first) */ - PGconn *conn; /* connection to foreign server, or NULL */ + PgFdwConn *conn; /* connection to foreign server, or NULL */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ @@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ -static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user); static void check_conn_params(const char **keywords, const char **values); -static void configure_remote_session(PGconn *conn); -static void do_sql_command(PGconn *conn, const char *sql); +static void configure_remote_session(PgFdwConn *conn); +static void do_sql_command(PgFdwConn *conn, const char *sql); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, * be useful and not mere pedantry. We could not flush any active connections * mid-transaction anyway. */ -PGconn * +PgFdwConn * GetConnection(ForeignServer *server, UserMapping *user, bool will_prep_stmt) { @@ -161,9 +161,11 @@ GetConnection(ForeignServer *server, UserMapping *user, entry->have_error = false; entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", - entry->conn, server->servername); + PFCgetPGconn(entry->conn), server->servername); } + PFCincrementNscans(entry->conn); + /* * Start a new transaction or subtransaction if needed. */ @@ -178,10 +180,10 @@ GetConnection(ForeignServer *server, UserMapping *user, /* * Connect to remote server using specified server and user mapping properties. */ -static PGconn * +static PgFdwConn * connect_pg_server(ForeignServer *server, UserMapping *user) { - PGconn *volatile conn = NULL; + PgFdwConn *volatile conn = NULL; /* * Use PG_TRY block to ensure closing connection on error. @@ -223,14 +225,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user) /* verify connection parameters and make connection */ check_conn_params(keywords, values); - conn = PQconnectdbParams(keywords, values, false); - if (!conn || PQstatus(conn) != CONNECTION_OK) + conn = PFCconnectdbParams(keywords, values, false); + if (!conn || PFCstatus(conn) != CONNECTION_OK) { char *connmessage; int msglen; /* libpq typically appends a newline, strip that */ - connmessage = pstrdup(PQerrorMessage(conn)); + connmessage = pstrdup(PFCerrorMessage(conn)); msglen = strlen(connmessage); if (msglen > 0 && connmessage[msglen - 1] == '\n') connmessage[msglen - 1] = '\0'; @@ -246,7 +248,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) * otherwise, he's piggybacking on the postgres server's user * identity. See also dblink_security_check() in contrib/dblink. */ - if (!superuser() && !PQconnectionUsedPassword(conn)) + if (!superuser() && !PFCconnectionUsedPassword(conn)) ereport(ERROR, (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), errmsg("password is required"), @@ -263,7 +265,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) { /* Release PGconn data structure if we managed to create one */ if (conn) - PQfinish(conn); + PFCfinish(conn); PG_RE_THROW(); } PG_END_TRY(); @@ -312,9 +314,9 @@ check_conn_params(const char **keywords, const char **values) * there are any number of ways to break things. */ static void -configure_remote_session(PGconn *conn) +configure_remote_session(PgFdwConn *conn) { - int remoteversion = PQserverVersion(conn); + int remoteversion = PFCserverVersion(conn); /* Force the search path to contain only pg_catalog (see deparse.c) */ do_sql_command(conn, "SET search_path = pg_catalog"); @@ -348,11 +350,11 @@ configure_remote_session(PGconn *conn) * Convenience subroutine to issue a non-data-returning SQL command to remote */ static void -do_sql_command(PGconn *conn, const char *sql) +do_sql_command(PgFdwConn *conn, const char *sql) { PGresult *res; - res = PQexec(conn, sql); + res = PFCexec(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -379,7 +381,7 @@ begin_remote_xact(ConnCacheEntry *entry) const char *sql; elog(DEBUG3, "starting remote transaction on connection %p", - entry->conn); + PFCgetPGconn(entry->conn)); if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; @@ -408,13 +410,11 @@ begin_remote_xact(ConnCacheEntry *entry) * Release connection reference count created by calling GetConnection. */ void -ReleaseConnection(PGconn *conn) +ReleaseConnection(PgFdwConn *conn) { - /* - * Currently, we don't actually track connection references because all - * cleanup is managed on a transaction or subtransaction basis instead. So - * there's nothing to do here. - */ + /* ongoing async query should be canceled if no scans left */ + if (PFCdecrementNscans(conn) == 0) + finish_async_query(conn); } /* @@ -429,7 +429,7 @@ ReleaseConnection(PGconn *conn) * collisions are highly improbable; just be sure to use %u not %d to print. */ unsigned int -GetCursorNumber(PGconn *conn) +GetCursorNumber(PgFdwConn *conn) { return ++cursor_number; } @@ -443,7 +443,7 @@ GetCursorNumber(PGconn *conn) * increasing the risk of prepared-statement name collisions by resetting. */ unsigned int -GetPrepStmtNumber(PGconn *conn) +GetPrepStmtNumber(PgFdwConn *conn) { return ++prep_stmt_number; } @@ -462,7 +462,7 @@ GetPrepStmtNumber(PGconn *conn) * marked with have_error = true. */ void -pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, +pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, bool clear, const char *sql) { /* If requested, PGresult must be released before leaving this function. */ @@ -490,7 +490,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, * return NULL, not a PGresult at all. */ if (message_primary == NULL) - message_primary = PQerrorMessage(conn); + message_primary = PFCerrorMessage(conn); ereport(elevel, (errcode(sqlstate), @@ -542,7 +542,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (entry->xact_depth > 0) { elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + PFCgetPGconn(entry->conn)); switch (event) { @@ -568,7 +568,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = PFCexec(entry->conn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -600,7 +600,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Assume we might have lost track of prepared statements */ entry->have_error = true; /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn, "ABORT TRANSACTION"); + res = PFCexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, @@ -611,7 +611,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* As above, make sure to clear any prepared stmts */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = PFCexec(entry->conn, "DEALLOCATE ALL"); PQclear(res); } entry->have_prep_stmt = false; @@ -623,17 +623,19 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; + PFCcancelAsync(entry->conn); + PFCinit(entry->conn); /* * If the connection isn't in a good idle state, discard it to * recover. Next GetConnection will open a new connection. */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + if (PFCstatus(entry->conn) != CONNECTION_OK || + PFCtransactionStatus(entry->conn) != PQTRANS_IDLE) { - elog(DEBUG3, "discarding connection %p", entry->conn); - PQfinish(entry->conn); - entry->conn = NULL; + elog(DEBUG3, "discarding connection %p", + PFCgetPGconn(entry->conn)); + PFCfinish(entry->conn); } } @@ -679,6 +681,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, PGresult *res; char sql[100]; + /* Shut down asynchronous scan if running */ + PFCcancelAsync(entry->conn); + /* * We only care about connections with open remote subtransactions of * the current level. @@ -704,7 +709,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, snprintf(sql, sizeof(sql), "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", curlevel, curlevel); - res = PQexec(entry->conn, sql); + res = PFCexec(entry->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(WARNING, res, entry->conn, true, sql); else diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 6da01e1..40cac3b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -22,6 +22,7 @@ #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/execnodes.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/cost.h" @@ -124,6 +125,13 @@ enum FdwModifyPrivateIndex FdwModifyPrivateRetrievedAttrs }; +typedef enum fetch_mode { + START_ONLY, + FORCE_SYNC, + ALLOW_ASYNC, + EXIT_ASYNC +} fetch_mode; + /* * Execution state of a foreign scan using postgres_fdw. */ @@ -137,7 +145,7 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + PgFdwConn *conn; /* connection for the scan */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -157,6 +165,7 @@ typedef struct PgFdwScanState /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ + ExprContext *econtext; /* copy of ps_ExprContext of ForeignScanState */ } PgFdwScanState; /* @@ -168,7 +177,7 @@ typedef struct PgFdwModifyState AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + PgFdwConn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -299,7 +308,7 @@ static void estimate_path_cost_size(PlannerInfo *root, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost); static void get_remote_estimate(const char *sql, - PGconn *conn, + PgFdwConn *conn, double *rows, int *width, Cost *startup_cost, @@ -307,9 +316,9 @@ static void get_remote_estimate(const char *sql, static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); -static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void create_cursor(PgFdwScanState *fsstate); +static void fetch_more_data(PgFdwScanState *node, fetch_mode cmd); +static void close_cursor(PgFdwConn *conn, unsigned int cursor_number); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, @@ -877,6 +886,21 @@ postgresGetForeignPlan(PlannerInfo *root, NIL /* no custom tlist */ ); } +/* call back function to kick the query to start on remote */ +static void +postgresPreExecCallback(EState *estate, Node *node) +{ + PgFdwScanState *fsstate = + (PgFdwScanState *)((ForeignScanState *)node)->fdw_state; + + create_cursor(fsstate); + /* + * Start async scan if this is the first scan. See fetch_more_data() for + * details + */ + fetch_more_data(fsstate, START_ONLY); +} + /* * postgresBeginForeignScan * Initiate an executor scan of a foreign PostgreSQL table. @@ -988,6 +1012,16 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); else fsstate->param_values = NULL; + + fsstate->econtext = node->ss.ps.ps_ExprContext; + + /* + * Register this node to be asynchronously executed if this is the first + * scan on this connection + */ + if (PFCgetNscans(fsstate->conn) == 1) + RegisterPreExecCallback(postgresPreExecCallback, estate, + (Node*)node, NULL); } /* @@ -1006,7 +1040,10 @@ postgresIterateForeignScan(ForeignScanState *node) * cursor on the remote side. */ if (!fsstate->cursor_exists) - create_cursor(node); + { + finish_async_query(fsstate->conn); + create_cursor(fsstate); + } /* * Get some more tuples, if we've run out. @@ -1015,7 +1052,7 @@ postgresIterateForeignScan(ForeignScanState *node) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) - fetch_more_data(node); + fetch_more_data(fsstate, ALLOW_ASYNC); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); @@ -1075,7 +1112,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fsstate->conn, sql); + res = PFCexec(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1411,19 +1448,22 @@ postgresExecForeignInsert(EState *estate, /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, NULL, slot); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = PFCexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1481,19 +1521,22 @@ postgresExecForeignUpdate(EState *estate, (ItemPointer) DatumGetPointer(datum), slot); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = PFCexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1551,19 +1594,22 @@ postgresExecForeignDelete(EState *estate, (ItemPointer) DatumGetPointer(datum), NULL); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = PFCexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1613,7 +1659,7 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fmstate->conn, sql); + res = PFCexec(fmstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -1745,7 +1791,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_join_conds; StringInfoData sql; List *retrieved_attrs; - PGconn *conn; + PgFdwConn *conn; Selectivity local_sel; QualCost local_cost; @@ -1855,7 +1901,7 @@ estimate_path_cost_size(PlannerInfo *root, * The given "sql" must be an EXPLAIN command. */ static void -get_remote_estimate(const char *sql, PGconn *conn, +get_remote_estimate(const char *sql, PgFdwConn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { @@ -1871,7 +1917,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = PQexec(conn, sql); + res = PFCexec(conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -1936,13 +1982,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, * Create cursor for node's query with current parameter values. */ static void -create_cursor(ForeignScanState *node) +create_cursor(PgFdwScanState *fsstate) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - ExprContext *econtext = node->ss.ps.ps_ExprContext; + ExprContext *econtext = fsstate->econtext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PgFdwConn *conn = fsstate->conn; StringInfoData buf; PGresult *res; @@ -2004,8 +2049,8 @@ create_cursor(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecParams(conn, buf.data, numParams, NULL, values, - NULL, NULL, 0); + res = PFCexecParams(conn, buf.data, numParams, NULL, values, + NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); @@ -2026,54 +2071,121 @@ create_cursor(ForeignScanState *node) * Fetch some more rows from the node's cursor. */ static void -fetch_more_data(ForeignScanState *node) +fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; /* * We'll store the tuples in the batch_cxt. First, flush the previous - * batch. + * batch for the case other than exiting from async mode. */ - fsstate->tuples = NULL; - MemoryContextReset(fsstate->batch_cxt); + if (cmd != EXIT_ASYNC) + { + fsstate->tuples = NULL; + MemoryContextReset(fsstate->batch_cxt); + } oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PgFdwConn *conn = fsstate->conn; char sql[64]; int fetch_size; - int numrows; + int numrows, addrows, restrows; + HeapTuple *tmptuples; int i; + int fetch_buf_size; /* The fetch size is arbitrary, but shouldn't be enormous. */ fetch_size = 100; + /* Make the query to fetch tuples */ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fetch_size, fsstate->cursor_number); - res = PQexec(conn, sql); + if (PFCisAsyncRunning(conn)) + { + Assert (cmd != START_ONLY); + + /* + * If the target fsstate is different from the scan state that the + * current async fetch running for, the result should be stored + * into it, then synchronously fetch data for the target fsstate. + */ + if (fsstate != PFCgetAsyncScan(conn)) + { + fetch_more_data(PFCgetAsyncScan(conn), EXIT_ASYNC); + res = PFCexec(conn, sql); + } + else + { + /* Get result of running async fetch */ + res = PFCgetResult(conn); + if (PQntuples(res) == fetch_size) + { + /* + * Connection state doesn't go to IDLE even if all data + * has been sent to client for asynchronous query. One + * more PQgetResult() is needed to reset the state to + * IDLE. See PQexecFinish() for details. + */ + if (PFCgetResult(conn) != NULL) + elog(ERROR, "Connection status error."); + } + } + PFCsetAsyncScan(conn, NULL); + } + else + { + if (cmd == START_ONLY) + { + if (!PFCsendQuery(conn, sql)) + pgfdw_report_error(ERROR, res, conn, false, + fsstate->query); + + PFCsetAsyncScan(conn, fsstate); + goto end_of_fetch; + } + + /* Elsewise do synchronous query execution */ + PFCsetAsyncScan(conn, NULL); + res = PFCexec(conn, sql); + } + /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) + if (res && PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); - /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + /* allocate tuple storage */ + tmptuples = fsstate->tuples; + addrows = PQntuples(res); + restrows = fsstate->num_tuples - fsstate->next_tuple; + numrows = restrows + addrows; + fetch_buf_size = numrows * sizeof(HeapTuple); + fsstate->tuples = (HeapTuple *) palloc0(fetch_buf_size); + + Assert(restrows == 0 || tmptuples); + + /* copy unread tuples if any */ + for (i = 0 ; i < restrows ; i++) + fsstate->tuples[i] = tmptuples[fsstate->next_tuple + i]; + fsstate->num_tuples = numrows; fsstate->next_tuple = 0; - for (i = 0; i < numrows; i++) + /* Convert the data into HeapTuples */ + for (i = 0 ; i < addrows; i++) { - fsstate->tuples[i] = + HeapTuple tup = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, fsstate->temp_cxt); + fsstate->tuples[restrows + i] = tup; + fetch_buf_size += (HEAPTUPLESIZE + tup->t_len); } /* Update fetch_ct_2 */ @@ -2085,6 +2197,23 @@ fetch_more_data(ForeignScanState *node) PQclear(res); res = NULL; + + if (cmd == ALLOW_ASYNC) + { + if (!fsstate->eof_reached) + { + /* + * We can immediately request the next bunch of tuples if + * we're on asynchronous connection. + */ + if (!PFCsendQuery(conn, sql)) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + PFCsetAsyncScan(conn, fsstate); + } + } + +end_of_fetch: + ; /* Nothing to do here but needed to make compiler quiet. */ } PG_CATCH(); { @@ -2098,6 +2227,28 @@ fetch_more_data(ForeignScanState *node) } /* + * Force cancelling async command state. + */ +void +finish_async_query(PgFdwConn *conn) +{ + PgFdwScanState *fsstate = PFCgetAsyncScan(conn); + PgFdwConn *async_conn; + + /* Nothing to do if no async connection */ + if (fsstate == NULL) return; + async_conn = fsstate->conn; + if (!async_conn || + PFCgetNscans(async_conn) == 1 || + !PFCisAsyncRunning(async_conn)) + return; + + fetch_more_data(PFCgetAsyncScan(async_conn), EXIT_ASYNC); + + Assert(!PFCisAsyncRunning(async_conn)); +} + +/* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. * @@ -2151,7 +2302,7 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PgFdwConn *conn, unsigned int cursor_number) { char sql[64]; PGresult *res; @@ -2162,7 +2313,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(conn, sql); + res = PFCexec(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -2184,6 +2335,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); + /* Finish async query if runing */ + finish_async_query(fmstate->conn); + /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems @@ -2194,11 +2348,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQprepare(fmstate->conn, - p_name, - fmstate->query, - 0, - NULL); + res = PFCprepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -2316,7 +2470,7 @@ postgresAnalyzeForeignTable(Relation relation, ForeignTable *table; ForeignServer *server; UserMapping *user; - PGconn *conn; + PgFdwConn *conn; StringInfoData sql; PGresult *volatile res = NULL; @@ -2348,7 +2502,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = PFCexec(conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2398,7 +2552,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, ForeignTable *table; ForeignServer *server; UserMapping *user; - PGconn *conn; + PgFdwConn *conn; unsigned int cursor_number; StringInfoData sql; PGresult *volatile res = NULL; @@ -2442,7 +2596,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = PFCexec(conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -2472,7 +2626,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); - res = PQexec(conn, fetch_sql); + res = PFCexec(conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -2600,7 +2754,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) bool import_not_null = true; ForeignServer *server; UserMapping *mapping; - PGconn *conn; + PgFdwConn *conn; StringInfoData buf; PGresult *volatile res = NULL; int numrows, @@ -2633,7 +2787,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) conn = GetConnection(server, mapping, false); /* Don't attempt to import collation if remote server hasn't got it */ - if (PQserverVersion(conn) < 90100) + if (PFCserverVersion(conn) < 90100) import_collate = false; /* Create workspace for strings */ @@ -2646,7 +2800,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = PQexec(conn, buf.data); + res = PFCexec(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -2741,7 +2895,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfo(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = PQexec(conn, buf.data); + res = PFCexec(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 3835ddb..c87e5cf 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -18,19 +18,22 @@ #include "nodes/relation.h" #include "utils/relcache.h" -#include "libpq-fe.h" +#include "PgFdwConn.h" + +struct PgFdwScanState; /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); +extern void finish_async_query(PgFdwConn *fsstate); /* in connection.c */ -extern PGconn *GetConnection(ForeignServer *server, UserMapping *user, +extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user, bool will_prep_stmt); -extern void ReleaseConnection(PGconn *conn); -extern unsigned int GetCursorNumber(PGconn *conn); -extern unsigned int GetPrepStmtNumber(PGconn *conn); -extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, +extern void ReleaseConnection(PgFdwConn *conn); +extern unsigned int GetCursorNumber(PgFdwConn *conn); +extern unsigned int GetPrepStmtNumber(PgFdwConn *conn); +extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn, bool clear, const char *sql); /* in option.c */ -- 1.8.3.1
>From 307209588737de34573d39d2b2376ce6f689a0f6 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 26 Jun 2015 17:31:26 +0900 Subject: [PATCH 3/3] Add experimental (POC) adaptive fetch size feature. --- contrib/postgres_fdw/postgres_fdw.c | 114 ++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 6 deletions(-) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 40cac3b..108b4ba 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -48,6 +48,27 @@ PG_MODULE_MAGIC; /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ #define DEFAULT_FDW_TUPLE_COST 0.01 +/* Fetch size at startup. This might be better be a GUC parameter */ +#define MIN_FETCH_SIZE 100 + +/* Maximum fetch size. This might be better be a GUC parameter */ +#define MAX_FETCH_SIZE 1000 + +/* + * Maximum size for fetch buffer in kilobytes. Ditto. + * + * This should be far larger than sizeof(HeapTuple) * FETCH_SIZE_MAX. This is + * not a hard limit because we cannot know in advance the average row length + * returned. + */ +#define MAX_FETCH_BUFFER_SIZE 10000 /* 10MB */ + +/* Maximum duration allowed for a single fetch, in milliseconds */ +#define MAX_FETCH_DURATION 500 + +/* Number of successive async fetches to enlarge fetch_size */ +#define INCREASE_FETCH_SIZE_THRESHOLD 8 + /* * FDW-specific planner information kept in RelOptInfo.fdw_private for a * foreign table. This information is collected by postgresGetForeignRelSize. @@ -157,6 +178,12 @@ typedef struct PgFdwScanState HeapTuple *tuples; /* array of currently-retrieved tuples */ int num_tuples; /* # of tuples in array */ int next_tuple; /* index of next one to return */ + int fetch_size; /* rows to be fetched at once */ + int successive_async; /* # of successive fetches at this + fetch_size */ + long last_fetch_req_at; /* The time of the last fetch request, in + * milliseconds*/ + int last_buf_size; /* Buffer size required for the last fetch */ /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ @@ -886,6 +913,7 @@ postgresGetForeignPlan(PlannerInfo *root, NIL /* no custom tlist */ ); } + /* call back function to kick the query to start on remote */ static void postgresPreExecCallback(EState *estate, Node *node) @@ -1015,6 +1043,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->econtext = node->ss.ps.ps_ExprContext; + fsstate->fetch_size = MIN_FETCH_SIZE; + fsstate->successive_async = 0; + fsstate->last_buf_size = 0; + /* * Register this node to be asynchronously executed if this is the first * scan on this connection @@ -2092,18 +2124,72 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { PgFdwConn *conn = fsstate->conn; char sql[64]; - int fetch_size; int numrows, addrows, restrows; HeapTuple *tmptuples; + int prev_fetch_size = fsstate->fetch_size; + int new_fetch_size = fsstate->fetch_size; int i; + struct timeval tv = {0, 0}; + long current_time; int fetch_buf_size; - /* The fetch size is arbitrary, but shouldn't be enormous. */ - fetch_size = 100; + gettimeofday(&tv, NULL); + current_time = tv.tv_sec * 1000 + tv.tv_usec / 1000; + + /* + * Calculate adaptive fetch size + * + * Calculate fetch_size based on maximal allowed duration and buffer + * space. The fetch buffer size shouldn't be enormous so we try to + * keep it under MAX_FETCH_BUFFER_SIZE. + */ + + /* Decrease fetch_size if the previous required buffer size exceeded + * MAX_FETCH_BUFFER_SIZE.*/ + if (fsstate->last_buf_size > MAX_FETCH_BUFFER_SIZE) + { + new_fetch_size = + (int)((double)fsstate->fetch_size * MAX_FETCH_BUFFER_SIZE / + fsstate->last_buf_size); + } + /* + * Decrease fetch_size to twice if the last duration to fetch was too + * long. + */ + if (PFCisBusy(conn) && + fsstate->fetch_size > MIN_FETCH_SIZE && + fsstate->last_fetch_req_at + MAX_FETCH_DURATION < + current_time) + { + int tmp_fetch_size = fsstate->fetch_size / 2; + if (tmp_fetch_size < new_fetch_size) + new_fetch_size = tmp_fetch_size; + } + + /* + * Increase fetch_size to twice if not decreased so far and other + * conditions match. + */ + if (new_fetch_size == fsstate->fetch_size && + fsstate->successive_async >= INCREASE_FETCH_SIZE_THRESHOLD && + fsstate->fetch_size < MAX_FETCH_SIZE) + new_fetch_size *= 2; + + /* Change fetch_size as calculated above */ + if (new_fetch_size != fsstate->fetch_size) + { + if (new_fetch_size > MAX_FETCH_SIZE) + fsstate->fetch_size = MAX_FETCH_SIZE; + else if (new_fetch_size < MIN_FETCH_SIZE) + fsstate->fetch_size = MIN_FETCH_SIZE; + else + fsstate->fetch_size = new_fetch_size; + fsstate->successive_async = 0; + } /* Make the query to fetch tuples */ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fetch_size, fsstate->cursor_number); + fsstate->fetch_size, fsstate->cursor_number); if (PFCisAsyncRunning(conn)) { @@ -2123,7 +2209,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { /* Get result of running async fetch */ res = PFCgetResult(conn); - if (PQntuples(res) == fetch_size) + if (PQntuples(res) == prev_fetch_size) { /* * Connection state doesn't go to IDLE even if all data @@ -2144,6 +2230,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) if (!PFCsendQuery(conn, sql)) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + fsstate->last_fetch_req_at = current_time; PFCsetAsyncScan(conn, fsstate); goto end_of_fetch; @@ -2188,12 +2275,14 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) fetch_buf_size += (HEAPTUPLESIZE + tup->t_len); } + fsstate->last_buf_size = fetch_buf_size / 1024; /* in kilobytes */ + /* Update fetch_ct_2 */ if (fsstate->fetch_ct_2 < 2) fsstate->fetch_ct_2++; /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fetch_size); + fsstate->eof_reached = (numrows < prev_fetch_size); PQclear(res); res = NULL; @@ -2208,6 +2297,7 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) */ if (!PFCsendQuery(conn, sql)) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + fsstate->last_fetch_req_at = current_time; PFCsetAsyncScan(conn, fsstate); } } @@ -2223,6 +2313,18 @@ end_of_fetch: } PG_END_TRY(); + if (PFCisAsyncRunning(fsstate->conn)) + { + if (fsstate->successive_async < INCREASE_FETCH_SIZE_THRESHOLD) + fsstate->successive_async++; + } + else + { + /* Reset fetch_size if the async_fetch stopped */ + fsstate->successive_async = 0; + fsstate->fetch_size = MIN_FETCH_SIZE; + } + MemoryContextSwitchTo(oldcontext); } -- 1.8.3.1
>From e2cc7054bbab06c631d7c78491cb52143a4e47f9 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Mon, 29 Jun 2015 16:51:12 +0900 Subject: [PATCH] POC: Experimental fetch_by_size feature --- contrib/auto_explain/auto_explain.c | 8 +- contrib/pg_stat_statements/pg_stat_statements.c | 8 +- contrib/postgres_fdw/postgres_fdw.c | 92 +++++++++++++++----- src/backend/access/common/heaptuple.c | 42 +++++++++ src/backend/commands/copy.c | 2 +- src/backend/commands/createas.c | 2 +- src/backend/commands/explain.c | 2 +- src/backend/commands/extension.c | 2 +- src/backend/commands/matview.c | 2 +- src/backend/commands/portalcmds.c | 4 +- src/backend/commands/prepare.c | 2 +- src/backend/executor/execMain.c | 39 +++++++-- src/backend/executor/execUtils.c | 1 + src/backend/executor/functions.c | 2 +- src/backend/executor/spi.c | 4 +- src/backend/parser/gram.y | 65 ++++++++++++++ src/backend/tcop/postgres.c | 2 + src/backend/tcop/pquery.c | 109 +++++++++++++++++------- src/include/access/htup_details.h | 2 + src/include/executor/executor.h | 8 +- src/include/nodes/execnodes.h | 1 + src/include/nodes/parsenodes.h | 2 + src/include/tcop/pquery.h | 7 +- src/interfaces/ecpg/preproc/ecpg.addons | 83 ++++++++++++++++++ 24 files changed, 409 insertions(+), 82 deletions(-) diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c index 2a184ed..f121a33 100644 --- a/contrib/auto_explain/auto_explain.c +++ b/contrib/auto_explain/auto_explain.c @@ -57,7 +57,7 @@ void _PG_fini(void); static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags); static void explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, - long count); + long count, long size); static void explain_ExecutorFinish(QueryDesc *queryDesc); static void explain_ExecutorEnd(QueryDesc *queryDesc); @@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorRun hook: all we need do is track nesting depth */ static void -explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) +explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size) { nesting_level++; PG_TRY(); { if (prev_ExecutorRun) - prev_ExecutorRun(queryDesc, direction, count); + prev_ExecutorRun(queryDesc, direction, count, size); else - standard_ExecutorRun(queryDesc, direction, count); + standard_ExecutorRun(queryDesc, direction, count, size); nesting_level--; } PG_CATCH(); diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index 0eb991c..593d406 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -289,7 +289,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query); static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags); static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, - long count); + long count, long size); static void pgss_ExecutorFinish(QueryDesc *queryDesc); static void pgss_ExecutorEnd(QueryDesc *queryDesc); static void pgss_ProcessUtility(Node *parsetree, const char *queryString, @@ -870,15 +870,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorRun hook: all we need do is track nesting depth */ static void -pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) +pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size) { nested_level++; PG_TRY(); { if (prev_ExecutorRun) - prev_ExecutorRun(queryDesc, direction, count); + prev_ExecutorRun(queryDesc, direction, count, size); else - standard_ExecutorRun(queryDesc, direction, count); + standard_ExecutorRun(queryDesc, direction, count, size); nested_level--; } PG_CATCH(); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 40cac3b..0419cde 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -48,6 +48,11 @@ PG_MODULE_MAGIC; /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ #define DEFAULT_FDW_TUPLE_COST 0.01 +/* Maximum tuples per fetch */ +#define MAX_FETCH_SIZE 10000 + +/* Maximum memory usable for retrieved data */ +#define MAX_FETCH_MEM (512 * 1024) /* * FDW-specific planner information kept in RelOptInfo.fdw_private for a * foreign table. This information is collected by postgresGetForeignRelSize. @@ -166,6 +171,8 @@ typedef struct PgFdwScanState MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ ExprContext *econtext; /* copy of ps_ExprContext of ForeignScanState */ + long max_palloced_mem; /* For test, remove me later */ + int max_numrows; } PgFdwScanState; /* @@ -331,6 +338,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); +static Size estimate_tuple_overhead(TupleDesc tupDesc, + List *retrieved_attrs); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -1138,6 +1147,7 @@ postgresEndForeignScan(ForeignScanState *node) if (fsstate == NULL) return; + elog(LOG, "Max memory for tuple store = %ld, max numrows = %d", fsstate->max_palloced_mem, fsstate->max_numrows); /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); @@ -2092,18 +2102,20 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { PgFdwConn *conn = fsstate->conn; char sql[64]; - int fetch_size; + int fetch_mem; + int tuple_overhead; int numrows, addrows, restrows; HeapTuple *tmptuples; int i; int fetch_buf_size; - /* The fetch size is arbitrary, but shouldn't be enormous. */ - fetch_size = 100; - - /* Make the query to fetch tuples */ - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fetch_size, fsstate->cursor_number); + tuple_overhead = estimate_tuple_overhead(fsstate->attinmeta->tupdesc, + fsstate->retrieved_attrs); + fetch_mem = MAX_FETCH_MEM - MAX_FETCH_SIZE * sizeof(HeapTuple); + snprintf(sql, sizeof(sql), "FETCH %d LIMIT %d (%d) FROM c%u", + MAX_FETCH_SIZE, + fetch_mem, tuple_overhead, + fsstate->cursor_number); if (PFCisAsyncRunning(conn)) { @@ -2123,17 +2135,15 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { /* Get result of running async fetch */ res = PFCgetResult(conn); - if (PQntuples(res) == fetch_size) - { - /* - * Connection state doesn't go to IDLE even if all data - * has been sent to client for asynchronous query. One - * more PQgetResult() is needed to reset the state to - * IDLE. See PQexecFinish() for details. - */ - if (PFCgetResult(conn) != NULL) - elog(ERROR, "Connection status error."); - } + + /* + * Connection state doesn't go to IDLE even if all data + * has been sent to client for asynchronous query. One + * more PQgetResult() is needed to reset the state to + * IDLE. See PQexecFinish() for details. + */ + if (PFCgetResult(conn) != NULL) + elog(ERROR, "Connection status error."); } PFCsetAsyncScan(conn, NULL); } @@ -2161,6 +2171,8 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) /* allocate tuple storage */ tmptuples = fsstate->tuples; addrows = PQntuples(res); + if (fsstate->max_numrows < addrows) + fsstate->max_numrows = addrows; restrows = fsstate->num_tuples - fsstate->next_tuple; numrows = restrows + addrows; fetch_buf_size = numrows * sizeof(HeapTuple); @@ -2188,12 +2200,15 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) fetch_buf_size += (HEAPTUPLESIZE + tup->t_len); } + if (fsstate->max_palloced_mem < fetch_buf_size) + fsstate->max_palloced_mem = fetch_buf_size; + /* Update fetch_ct_2 */ if (fsstate->fetch_ct_2 < 2) fsstate->fetch_ct_2++; - /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fetch_size); + /* Must be EOF if we have no new tuple here. */ + fsstate->eof_reached = (addrows == 0); PQclear(res); res = NULL; @@ -3007,6 +3022,43 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) } /* + * Compute the estimated overhead of the result tuples + * See heap_form_tuple for the details of this calculation. + */ +static Size +estimate_tuple_overhead(TupleDesc tupDesc, + List *retrieved_attrs) +{ + Size size = 0; + int ncol = list_length(retrieved_attrs); + ListCell *lc; + + size += offsetof(HeapTupleHeaderData, t_bits); + size += BITMAPLEN(ncol); + + if (tupDesc->tdhasoid) + size += sizeof(Oid); + + size = MAXALIGN(size); + + size += sizeof(Datum) * ncol; + size += sizeof(bool) * ncol; + + foreach (lc, retrieved_attrs) + { + int i = lfirst_int(lc); + + if (i > 0) + { + if (tupDesc->attrs[i - 1]->attbyval) + size -= (sizeof(Datum) - tupDesc->attrs[i - 1]->attlen); + } + } + + return size; +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index 09aea79..17525b5 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -133,6 +133,48 @@ heap_compute_data_size(TupleDesc tupleDesc, return data_length; } +Size +slot_compute_raw_data_size(TupleTableSlot *slot) +{ + TupleDesc tupleDesc = slot->tts_tupleDescriptor; + Datum *values = slot->tts_values; + bool *isnull = slot->tts_isnull; + Size data_length = 0; + int i; + int numberOfAttributes = tupleDesc->natts; + Form_pg_attribute *att = tupleDesc->attrs; + + if (slot->tts_nvalid < tupleDesc->natts) + heap_deform_tuple(slot->tts_tuple, tupleDesc, + slot->tts_values, slot->tts_isnull); + + for (i = 0; i < numberOfAttributes; i++) + { + Datum val; + Form_pg_attribute atti; + + if (isnull[i]) + continue; + + val = values[i]; + atti = att[i]; + + if (atti->attlen == -1) + { + data_length += toast_raw_datum_size(val); + } + else + { + data_length = att_align_datum(data_length, atti->attalign, + atti->attlen, val); + data_length = att_addlength_datum(data_length, atti->attlen, + val); + } + } + + return data_length; +} + /* * heap_fill_tuple * Load data portion of a tuple from values/isnull arrays diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 8904676..463fc67 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -1928,7 +1928,7 @@ CopyTo(CopyState cstate) else { /* run the plan --- the dest receiver will send tuples */ - ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L, 0); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 41183f6..7612391 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString, dir = ForwardScanDirection; /* run the plan */ - ExecutorRun(queryDesc, dir, 0L); + ExecutorRun(queryDesc, dir, 0L, 0L, 0); /* save the rowcount if we're given a completionTag to fill */ if (completionTag) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 0d1ecc2..4480343 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -498,7 +498,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, dir = ForwardScanDirection; /* run the plan */ - ExecutorRun(queryDesc, dir, 0L); + ExecutorRun(queryDesc, dir, 0L, 0L, 0); /* run cleanup too */ ExecutorFinish(queryDesc); diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c index 2b1dcd0..bc116f9 100644 --- a/src/backend/commands/extension.c +++ b/src/backend/commands/extension.c @@ -733,7 +733,7 @@ execute_sql_string(const char *sql, const char *filename) dest, NULL, 0); ExecutorStart(qdesc, 0); - ExecutorRun(qdesc, ForwardScanDirection, 0); + ExecutorRun(qdesc, ForwardScanDirection, 0L, 0L, 0); ExecutorFinish(qdesc); ExecutorEnd(qdesc); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 5492e59..39e29ba 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -363,7 +363,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS); /* run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0); /* and clean up */ ExecutorFinish(queryDesc); diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 2794537..85fffc1 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -177,6 +177,8 @@ PerformPortalFetch(FetchStmt *stmt, nprocessed = PortalRunFetch(portal, stmt->direction, stmt->howMany, + stmt->howLarge, + stmt->tupoverhead, dest); /* Return command status if wanted */ @@ -375,7 +377,7 @@ PersistHoldablePortal(Portal portal) true); /* Fetch the result set into the tuplestore */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0); (*queryDesc->dest->rDestroy) (queryDesc->dest); queryDesc->dest = NULL; diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index fb33d30..46fe4f8 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause, */ PortalStart(portal, paramLI, eflags, GetActiveSnapshot()); - (void) PortalRun(portal, count, false, dest, dest, completionTag); + (void) PortalRun(portal, count, 0L, 0, false, dest, dest, completionTag); PortalDrop(portal, false); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 51a86b2..5f0de97 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -79,6 +79,8 @@ static void ExecutePlan(EState *estate, PlanState *planstate, CmdType operation, bool sendTuples, long numberTuples, + long sizeTuples, + int tupleOverhead, ScanDirection direction, DestReceiver *dest); static bool ExecCheckRTEPerms(RangeTblEntry *rte); @@ -277,17 +279,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) */ void ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count) + ScanDirection direction, long count, long size, int tupoverhead) { if (ExecutorRun_hook) - (*ExecutorRun_hook) (queryDesc, direction, count); + (*ExecutorRun_hook) (queryDesc, direction, + count, size, tupoverhead); else - standard_ExecutorRun(queryDesc, direction, count); + standard_ExecutorRun(queryDesc, direction, + count, size, tupoverhead); } void standard_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count) + ScanDirection direction, + long count, long size, int tupoverhead) { EState *estate; CmdType operation; @@ -339,6 +344,8 @@ standard_ExecutorRun(QueryDesc *queryDesc, operation, sendTuples, count, + size, + tupoverhead, direction, dest); @@ -1551,22 +1558,27 @@ ExecutePlan(EState *estate, CmdType operation, bool sendTuples, long numberTuples, + long sizeTuples, + int tupleOverhead, ScanDirection direction, DestReceiver *dest) { TupleTableSlot *slot; long current_tuple_count; + long sent_size; /* * initialize local variables */ current_tuple_count = 0; - + sent_size = 0; /* * Set the direction. */ estate->es_direction = direction; + estate->es_stoppedbysize = false; + /* * Loop until we've processed the proper number of tuples from the plan. */ @@ -1621,6 +1633,23 @@ ExecutePlan(EState *estate, current_tuple_count++; if (numberTuples && numberTuples == current_tuple_count) break; + + if (sizeTuples > 0) + { + /* + * Count the size of tuples we've sent + * + * This needs all attributes deformed so a bit slow on some cases. + */ + sent_size += slot_compute_raw_data_size(slot) + tupleOverhead; + + /* Quit when the size limit will be exceeded by this tuple */ + if (sizeTuples < sent_size) + { + estate->es_stoppedbysize = true; + break; + } + } } } diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index e80bc22..6b59c05 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -126,6 +126,7 @@ CreateExecutorState(void) estate->es_preExecCallbacks = NULL; estate->es_processed = 0; + estate->es_stoppedbysize = false; estate->es_lastoid = InvalidOid; estate->es_top_eflags = 0; diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index ce49c47..7ab2e67 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -853,7 +853,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache) /* Run regular commands to completion unless lazyEval */ long count = (es->lazyEval) ? 1L : 0L; - ExecutorRun(es->qd, ForwardScanDirection, count); + ExecutorRun(es->qd, ForwardScanDirection, count, 0L, 0); /* * If we requested run to completion OR there was no tuple returned, diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index d544ad9..f29c3a8 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -2399,7 +2399,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount) ExecutorStart(queryDesc, eflags); - ExecutorRun(queryDesc, ForwardScanDirection, tcount); + ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L, 0); _SPI_current->processed = queryDesc->estate->es_processed; _SPI_current->lastoid = queryDesc->estate->es_lastoid; @@ -2477,7 +2477,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count, /* Run the cursor */ nfetched = PortalRunFetch(portal, direction, - count, + count, 0L, 0, dest); /* diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index e0ff6f1..b7b061c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -538,6 +538,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <str> opt_existing_window_name %type <boolean> opt_if_not_exists +%type <ival> opt_overhead + /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -6066,6 +6068,16 @@ fetch_args: cursor_name n->howMany = $1; $$ = (Node *)n; } + | SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $6; + n->direction = FETCH_FORWARD; + n->howMany = $1; + n->howLarge = $3; + n->tupoverhead = $4; + $$ = (Node *)n; + } | ALL opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6074,6 +6086,16 @@ fetch_args: cursor_name n->howMany = FETCH_ALL; $$ = (Node *)n; } + | ALL LIMIT Iconst opt_overhead opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $6; + n->direction = FETCH_FORWARD; + n->howMany = FETCH_ALL; + n->howLarge = $3; + n->tupoverhead = $4; + $$ = (Node *)n; + } | FORWARD opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6090,6 +6112,16 @@ fetch_args: cursor_name n->howMany = $2; $$ = (Node *)n; } + | FORWARD SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $7; + n->direction = FETCH_FORWARD; + n->howMany = $2; + n->howLarge = $4; + n->tupoverhead = $5; + $$ = (Node *)n; + } | FORWARD ALL opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6098,6 +6130,16 @@ fetch_args: cursor_name n->howMany = FETCH_ALL; $$ = (Node *)n; } + | FORWARD ALL LIMIT Iconst opt_overhead opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $7; + n->direction = FETCH_FORWARD; + n->howMany = FETCH_ALL; + n->howLarge = $4; + n->tupoverhead = $5; + $$ = (Node *)n; + } | BACKWARD opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6114,6 +6156,16 @@ fetch_args: cursor_name n->howMany = $2; $$ = (Node *)n; } + | BACKWARD SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $7; + n->direction = FETCH_BACKWARD; + n->howMany = $2; + n->howLarge = $4; + n->tupoverhead = $5; + $$ = (Node *)n; + } | BACKWARD ALL opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6122,6 +6174,16 @@ fetch_args: cursor_name n->howMany = FETCH_ALL; $$ = (Node *)n; } + | BACKWARD ALL LIMIT Iconst opt_overhead opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $7; + n->direction = FETCH_BACKWARD; + n->howMany = FETCH_ALL; + n->howLarge = $4; + n->tupoverhead = $5; + $$ = (Node *)n; + } ; from_in: FROM {} @@ -6132,6 +6194,9 @@ opt_from_in: from_in {} | /* EMPTY */ {} ; +opt_overhead: '(' Iconst ')' { $$ = $2;} + | /* EMPTY */ { $$ = 0; } + ; /***************************************************************************** * diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ce4bdaf..70641eb 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1103,6 +1103,7 @@ exec_simple_query(const char *query_string) */ (void) PortalRun(portal, FETCH_ALL, + 0L, 0, isTopLevel, receiver, receiver, @@ -1987,6 +1988,7 @@ exec_execute_message(const char *portal_name, long max_rows) completed = PortalRun(portal, max_rows, + 0L, 0, true, /* always top level */ receiver, receiver, diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 9c14e8a..ce9541a 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "access/xact.h" +#include "access/htup_details.h" #include "commands/prepare.h" #include "executor/tstoreReceiver.h" #include "miscadmin.h" @@ -39,9 +40,11 @@ static void ProcessQuery(PlannedStmt *plan, DestReceiver *dest, char *completionTag); static void FillPortalStore(Portal portal, bool isTopLevel); -static uint32 RunFromStore(Portal portal, ScanDirection direction, long count, +static uint32 RunFromStore(Portal portal, ScanDirection direction, + long count, long size, int tupoverhead, bool *stoppedbysize, DestReceiver *dest); -static long PortalRunSelect(Portal portal, bool forward, long count, +static long PortalRunSelect(Portal portal, bool forward, + long count, long size, int tupoverhead, DestReceiver *dest); static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel, DestReceiver *dest, char *completionTag); @@ -51,6 +54,8 @@ static void PortalRunMulti(Portal portal, bool isTopLevel, static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, + int tupoverehad, DestReceiver *dest); static void DoPortalRewind(Portal portal); @@ -182,7 +187,7 @@ ProcessQuery(PlannedStmt *plan, /* * Run the plan to completion. */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0); /* * Build command completion status string, if caller wants one. @@ -703,8 +708,8 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats) * suspended due to exhaustion of the count parameter. */ bool -PortalRun(Portal portal, long count, bool isTopLevel, - DestReceiver *dest, DestReceiver *altdest, +PortalRun(Portal portal, long count, long size, int tupoverhead, + bool isTopLevel, DestReceiver *dest, DestReceiver *altdest, char *completionTag) { bool result; @@ -787,7 +792,8 @@ PortalRun(Portal portal, long count, bool isTopLevel, /* * Now fetch desired portion of results. */ - nprocessed = PortalRunSelect(portal, true, count, dest); + nprocessed = PortalRunSelect(portal, true, + count, size, tupoverhead, dest); /* * If the portal result contains a command tag and the caller @@ -892,11 +898,14 @@ static long PortalRunSelect(Portal portal, bool forward, long count, + long size, + int tupoverhead, DestReceiver *dest) { QueryDesc *queryDesc; ScanDirection direction; uint32 nprocessed; + bool stoppedbysize; /* * NB: queryDesc will be NULL if we are fetching from a held cursor or a @@ -939,12 +948,15 @@ PortalRunSelect(Portal portal, count = 0; if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, count, dest); + nprocessed = RunFromStore(portal, direction, + count, size, tupoverhead, + &stoppedbysize, dest); else { PushActiveSnapshot(queryDesc->snapshot); - ExecutorRun(queryDesc, direction, count); + ExecutorRun(queryDesc, direction, count, size, tupoverhead); nprocessed = queryDesc->estate->es_processed; + stoppedbysize = queryDesc->estate->es_stoppedbysize; PopActiveSnapshot(); } @@ -954,8 +966,9 @@ PortalRunSelect(Portal portal, if (nprocessed > 0) portal->atStart = false; /* OK to go backward now */ - if (count == 0 || - (unsigned long) nprocessed < (unsigned long) count) + if ((count == 0 || + (unsigned long) nprocessed < (unsigned long) count) && + !stoppedbysize) portal->atEnd = true; /* we retrieved 'em all */ oldPos = portal->portalPos; portal->portalPos += nprocessed; @@ -982,12 +995,15 @@ PortalRunSelect(Portal portal, count = 0; if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, count, dest); + nprocessed = RunFromStore(portal, direction, + count, size, tupoverhead, + &stoppedbysize, dest); else { PushActiveSnapshot(queryDesc->snapshot); - ExecutorRun(queryDesc, direction, count); + ExecutorRun(queryDesc, direction, count, size, tupoverhead); nprocessed = queryDesc->estate->es_processed; + stoppedbysize = queryDesc->estate->es_stoppedbysize; PopActiveSnapshot(); } @@ -998,8 +1014,9 @@ PortalRunSelect(Portal portal, portal->atEnd = false; /* OK to go forward now */ portal->portalPos++; /* adjust for endpoint case */ } - if (count == 0 || - (unsigned long) nprocessed < (unsigned long) count) + if ((count == 0 || + (unsigned long) nprocessed < (unsigned long) count) && + !stoppedbysize) { portal->atStart = true; /* we retrieved 'em all */ portal->portalPos = 0; @@ -1088,11 +1105,15 @@ FillPortalStore(Portal portal, bool isTopLevel) * out for memory leaks. */ static uint32 -RunFromStore(Portal portal, ScanDirection direction, long count, - DestReceiver *dest) +RunFromStore(Portal portal, ScanDirection direction, + long count, long size_limit, int tupoverhead, + bool *stoppedbysize, DestReceiver *dest) { long current_tuple_count = 0; TupleTableSlot *slot; + long sent_size = 0; + + *stoppedbysize = false; slot = MakeSingleTupleTableSlot(portal->tupDesc); @@ -1122,6 +1143,9 @@ RunFromStore(Portal portal, ScanDirection direction, long count, break; (*dest->receiveSlot) (slot, dest); + /* Count the size of tuples we've sent */ + sent_size += slot_compute_raw_data_size(slot) + + tupoverhead; ExecClearTuple(slot); @@ -1133,10 +1157,19 @@ RunFromStore(Portal portal, ScanDirection direction, long count, current_tuple_count++; if (count && count == current_tuple_count) break; + + /* Quit when the size limit will be exceeded by this tuple */ + if (current_tuple_count > 0 && + size_limit > 0 && size_limit < sent_size) + { + *stoppedbysize = true; + break; + } } } (*dest->rShutdown) (dest); + elog(LOG, "Sent %ld bytes", sent_size); ExecDropSingleTupleTableSlot(slot); @@ -1385,6 +1418,8 @@ long PortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, + int tupoverhead, DestReceiver *dest) { long result; @@ -1422,7 +1457,8 @@ PortalRunFetch(Portal portal, switch (portal->strategy) { case PORTAL_ONE_SELECT: - result = DoPortalRunFetch(portal, fdirection, count, dest); + result = DoPortalRunFetch(portal, fdirection, + count, size, tupoverhead, dest); break; case PORTAL_ONE_RETURNING: @@ -1439,7 +1475,8 @@ PortalRunFetch(Portal portal, /* * Now fetch desired portion of results. */ - result = DoPortalRunFetch(portal, fdirection, count, dest); + result = DoPortalRunFetch(portal, fdirection, + count, size, tupoverhead, dest); break; default: @@ -1484,6 +1521,8 @@ static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, + int tupoverhead, DestReceiver *dest) { bool forward; @@ -1526,7 +1565,7 @@ DoPortalRunFetch(Portal portal, { DoPortalRewind(portal); if (count > 1) - PortalRunSelect(portal, true, count - 1, + PortalRunSelect(portal, true, count - 1, 0L, 0, None_Receiver); } else @@ -1536,13 +1575,15 @@ DoPortalRunFetch(Portal portal, if (portal->atEnd) pos++; /* need one extra fetch if off end */ if (count <= pos) - PortalRunSelect(portal, false, pos - count + 1, + PortalRunSelect(portal, false, + pos - count + 1, 0L, 0, None_Receiver); else if (count > pos + 1) - PortalRunSelect(portal, true, count - pos - 1, + PortalRunSelect(portal, true, + count - pos - 1, 0L, 0, None_Receiver); } - return PortalRunSelect(portal, true, 1L, dest); + return PortalRunSelect(portal, true, 1L, 0L, 0, dest); } else if (count < 0) { @@ -1553,17 +1594,19 @@ DoPortalRunFetch(Portal portal, * (Is it worth considering case where count > half of size of * query? We could rewind once we know the size ...) */ - PortalRunSelect(portal, true, FETCH_ALL, None_Receiver); + PortalRunSelect(portal, true, + FETCH_ALL, 0L, 0, None_Receiver); if (count < -1) - PortalRunSelect(portal, false, -count - 1, None_Receiver); - return PortalRunSelect(portal, false, 1L, dest); + PortalRunSelect(portal, false, + -count - 1, 0, 0, None_Receiver); + return PortalRunSelect(portal, false, 1L, 0L, 0, dest); } else { /* count == 0 */ /* Rewind to start, return zero rows */ DoPortalRewind(portal); - return PortalRunSelect(portal, true, 0L, dest); + return PortalRunSelect(portal, true, 0L, 0L, 0, dest); } break; case FETCH_RELATIVE: @@ -1573,8 +1616,9 @@ DoPortalRunFetch(Portal portal, * Definition: advance count-1 rows, return next row (if any). */ if (count > 1) - PortalRunSelect(portal, true, count - 1, None_Receiver); - return PortalRunSelect(portal, true, 1L, dest); + PortalRunSelect(portal, true, + count - 1, 0L, 0, None_Receiver); + return PortalRunSelect(portal, true, 1L, 0L, 0, dest); } else if (count < 0) { @@ -1583,8 +1627,9 @@ DoPortalRunFetch(Portal portal, * any). */ if (count < -1) - PortalRunSelect(portal, false, -count - 1, None_Receiver); - return PortalRunSelect(portal, false, 1L, dest); + PortalRunSelect(portal, false, + -count - 1, 0L, 0, None_Receiver); + return PortalRunSelect(portal, false, 1L, 0L, 0, dest); } else { @@ -1630,7 +1675,7 @@ DoPortalRunFetch(Portal portal, */ if (on_row) { - PortalRunSelect(portal, false, 1L, None_Receiver); + PortalRunSelect(portal, false, 1L, 0L, 0, None_Receiver); /* Set up to fetch one row forward */ count = 1; forward = true; @@ -1652,7 +1697,7 @@ DoPortalRunFetch(Portal portal, return result; } - return PortalRunSelect(portal, forward, count, dest); + return PortalRunSelect(portal, forward, count, size, tupoverhead, dest); } /* diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h index 55d483d..5f0c8f3 100644 --- a/src/include/access/htup_details.h +++ b/src/include/access/htup_details.h @@ -20,6 +20,7 @@ #include "access/transam.h" #include "storage/bufpage.h" +#include "executor/tuptable.h" /* * MaxTupleAttributeNumber limits the number of (user) columns in a tuple. * The key limit on this value is that the size of the fixed overhead for @@ -761,6 +762,7 @@ extern Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, /* prototypes for functions in common/heaptuple.c */ extern Size heap_compute_data_size(TupleDesc tupleDesc, Datum *values, bool *isnull); +extern Size slot_compute_raw_data_size(TupleTableSlot *slot); extern void heap_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull, char *data, Size data_size, diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 193a654..e2706a6 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -79,8 +79,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook; /* Hook for plugins to get control in ExecutorRun() */ typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc, - ScanDirection direction, - long count); + ScanDirection direction, + long count, long size, int tupoverhead); extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook; /* Hook for plugins to get control in ExecutorFinish() */ @@ -175,9 +175,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter, extern void ExecutorStart(QueryDesc *queryDesc, int eflags); extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags); extern void ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count); + ScanDirection direction, long count, long size, int tupoverhead); extern void standard_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count); + ScanDirection direction, long count, long size, int tupoverhead); extern void ExecutorFinish(QueryDesc *queryDesc); extern void standard_ExecutorFinish(QueryDesc *queryDesc); extern void ExecutorEnd(QueryDesc *queryDesc); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index cb8d854..f8121ec 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -410,6 +410,7 @@ typedef struct EState PreExecCallbackItem *es_preExecCallbacks; /* pre-exec callbacks */ uint32 es_processed; /* # of tuples processed */ + bool es_stoppedbysize; /* true if processing stopped by size */ Oid es_lastoid; /* last oid processed (by INSERT) */ int es_top_eflags; /* eflags passed to ExecutorStart */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 868905b..094c0ac 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2393,6 +2393,8 @@ typedef struct FetchStmt NodeTag type; FetchDirection direction; /* see above */ long howMany; /* number of rows, or position argument */ + long howLarge; /* total bytes of rows */ + int tupoverhead; /* declared overhead per tuple in client */ char *portalname; /* name of portal (cursor) */ bool ismove; /* TRUE if MOVE */ } FetchStmt; diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h index 8073a6e..021532c 100644 --- a/src/include/tcop/pquery.h +++ b/src/include/tcop/pquery.h @@ -17,7 +17,6 @@ #include "nodes/parsenodes.h" #include "utils/portal.h" - extern PGDLLIMPORT Portal ActivePortal; @@ -33,13 +32,15 @@ extern void PortalStart(Portal portal, ParamListInfo params, extern void PortalSetResultFormat(Portal portal, int nFormats, int16 *formats); -extern bool PortalRun(Portal portal, long count, bool isTopLevel, - DestReceiver *dest, DestReceiver *altdest, +extern bool PortalRun(Portal portal, long count, long size, int tupoverhead, + bool isTopLevel, DestReceiver *dest, DestReceiver *altdest, char *completionTag); extern long PortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, + int tupoverhead, DestReceiver *dest); #endif /* PQUERY_H */ diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons index b3b36cf..424f412 100644 --- a/src/interfaces/ecpg/preproc/ecpg.addons +++ b/src/interfaces/ecpg/preproc/ecpg.addons @@ -220,13 +220,56 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon ECPG: fetch_argsPRIORopt_from_incursor_name addon ECPG: fetch_argsFIRST_Popt_from_incursor_name addon ECPG: fetch_argsLAST_Popt_from_incursor_name addon + add_additional_variables($3, false); + if ($3[0] == ':') + { + free($3); + $3 = mm_strdup("$0"); + } ECPG: fetch_argsALLopt_from_incursor_name addon +ECPG: fetch_argsFORWARDopt_from_incursor_name addon +ECPG: fetch_argsBACKWARDopt_from_incursor_name addon add_additional_variables($3, false); if ($3[0] == ':') { free($3); $3 = mm_strdup("$0"); } +ECPG: fetch_argsALLLIMITIconstopt_overheadopt_from_incursor_name addon + add_additional_variables($6, false); + if ($6[0] == ':') + { + free($6); + $6 = mm_strdup("$0"); + } + if ($3[0] == '$') + { + free($3); + $3 = mm_strdup("$0"); + } + if ($4[0] == '$') + { + free($4); + $4 = mm_strdup("$0"); + } +ECPG: fetch_argsFORWARDALLLIMITIconstopt_overheadopt_from_incursor_name addon +ECPG: fetch_argsBACKWARDALLLIMITIconstopt_overheadopt_from_incursor_name addon + add_additional_variables($7, false); + if ($7[0] == ':') + { + free($7); + $7 = mm_strdup("$0"); + } + if ($4[0] == '$') + { + free($4); + $4 = mm_strdup("$0"); + } + if ($5[0] == '$') + { + free($5); + $5 = mm_strdup("$0"); + } ECPG: fetch_argsSignedIconstopt_from_incursor_name addon add_additional_variables($3, false); if ($3[0] == ':') @@ -234,11 +277,51 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon free($3); $3 = mm_strdup("$0"); } +ECPG: fetch_argsSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon + add_additional_variables($6, false); + if ($6[0] == ':') + { + free($6); + $6 = mm_strdup("$0"); + } if ($1[0] == '$') { free($1); $1 = mm_strdup("$0"); } + if ($3[0] == '$') + { + free($3); + $3 = mm_strdup("$0"); + } + if ($4[0] == '$') + { + free($4); + $4 = mm_strdup("$0"); + } +ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon +ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon + add_additional_variables($7, false); + if ($7[0] == ':') + { + free($7); + $7 = mm_strdup("$0"); + } + if ($2[0] == '$') + { + free($2); + $2 = mm_strdup("$0"); + } + if ($4[0] == '$') + { + free($4); + $4 = mm_strdup("$0"); + } + if ($5[0] == '$') + { + free($5); + $5 = mm_strdup("$0"); + } ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon add_additional_variables($4, false); -- 1.8.3.1
>From 92596ee8cda043cbc37cd27d4ec668f624a49d6e Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Tue, 30 Jun 2015 15:29:46 +0900 Subject: [PATCH] Add foreign table option to set fetch size. --- contrib/postgres_fdw/option.c | 2 ++ contrib/postgres_fdw/postgres_fdw.c | 31 +++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 7547ec2..793239d 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -153,6 +153,8 @@ InitPgFdwOptions(void) /* updatable is available on both server and table */ {"updatable", ForeignServerRelationId, false}, {"updatable", ForeignTableRelationId, false}, + /* fetch_size is available on table (XXX: also server may have it.) */ + {"fetch_size", ForeignTableRelationId, false}, {NULL, InvalidOid, false} }; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 40cac3b..e01213e 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -152,6 +152,7 @@ typedef struct PgFdwScanState FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ + int fetch_size; /* number of tuples to request by one fetch */ /* for storing result tuples */ HeapTuple *tuples; /* array of currently-retrieved tuples */ @@ -945,6 +946,31 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); + fsstate->fetch_size = -1; + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + /* Does anyone specify negatives? Who cares? */ + if (strcmp(def->defname, "fetch_size") == 0) + { + char *ep = NULL; + char *defstr = defGetString(def); + + fsstate->fetch_size = strtol(defstr, &ep, 10); + if (*ep || ep == defstr) + { + elog(WARNING, + "Option \"%s\" must be a positive integer (foreign table \"%s\") : \"%s\"", + def->defname, get_rel_name(table->relid), defstr); + fsstate->fetch_size = -1; /* Use default */ + } + break; + } + } + if (fsstate->fetch_size < 1) + fsstate->fetch_size = 100; /* default size */ + /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. @@ -2092,15 +2118,12 @@ fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd) { PgFdwConn *conn = fsstate->conn; char sql[64]; - int fetch_size; + int fetch_size = fsstate->fetch_size; int numrows, addrows, restrows; HeapTuple *tmptuples; int i; int fetch_buf_size; - /* The fetch size is arbitrary, but shouldn't be enormous. */ - fetch_size = 100; - /* Make the query to fetch tuples */ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fetch_size, fsstate->cursor_number); -- 1.8.3.1
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers