Thank you for the comment.

The automatic way to determin the fetch_size looks become too
much for the purpose. An example of non-automatic way is a new
foreign table option like 'fetch_size' but this exposes the
inside too much... Which do you think is preferable?

Thu, 22 Jan 2015 11:17:52 -0500, Tom Lane <t...@sss.pgh.pa.us> wrote in 
<24503.1421943...@sss.pgh.pa.us>
> Kyotaro HORIGUCHI <horiguchi.kyot...@lab.ntt.co.jp> writes:
> > Hello, as the discuttion on async fetching on postgres_fdw, FETCH
> > with data-size limitation would be useful to get memory usage
> > stability of postgres_fdw.
> 
> > Is such a feature and syntax could be allowed to be added?
> 
> This seems like a lot of work, and frankly an incredibly ugly API,
> for a benefit that is entirely hypothetical.  Have you got numbers
> showing any actual performance win for postgres_fdw?

The API is a rush work to make the path for the new parameter
(but, yes, I did too much for the purpose that use from
postgres_fdw..)  and it can be any saner syntax but it's not the
time to do so yet.

The data-size limitation, any size to limit, would give
significant gain especially for small sized rows.

This patch began from the fact that it runs about twice faster
when fetch size = 10000 than 100.

http://www.postgresql.org/message-id/20150116.171849.109146500.horiguchi.kyot...@lab.ntt.co.jp

I took exec times to get 1M rows from localhost via postgres_fdw
and it showed the following numbers.

=# SELECT a from ft1;
fetch_size,   avg row size(*1),   time,   alloced_mem/fetch(Mbytes)(*1)
(local)                            0.75s
100            60                  6.2s       6000 (0.006)
10000          60                  2.7s     600000 (0.6  )
33333          60                  2.2s    1999980 (2.0  )
66666          60                  2.4s    3999960 (4.0  )

=# SELECT a, b, c from ft1;
fetch_size,   avg row size(*1),   time,   alloced_mem/fetch(Mbytes)(*1)
(local)                            0.8s
100           204                 12  s      20400 (0.02 )
1000          204                 10  s     204000 (0.2  )
10000         204                  5.8s    2040000 (2    )
20000         204                  5.9s    4080000 (4    )

=# SELECT a, b, d from ft1;
fetch_size,   avg row size(*1),   time,   alloced_mem/fetch(Mbytes)(*1)
(local)                            0.8s
100          1356                 17  s     135600 (0.136)
1000         1356                 15  s    1356000 (1.356)
1475         1356                 13  s    2000100 (2.0  )
2950         1356                 13  s    4000200 (4.0  )

The definitions of the environment are the following.

CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'localhost', 
dbname 'postgres');
CREATE USER MAPPING FOR PUBLIC SERVER sv1;
CREATE TABLE lt1 (a int, b timestamp, c text, d text);
CREATE FOREIGN TABLE ft1 (a int, b timestamp, c text, d text) SERVER sv1 
OPTIONS (table_name 'lt1');
INSERT INTO lt1 (SELECT a, now(), repeat('x', 128), repeat('x', 1280) FROM 
generate_series(0, 999999) a);

The "avg row size" is alloced_mem/fetch_size and the alloced_mem
is the sum of HeapTuple[fetch_size] and (HEAPTUPLESIZE +
tup->t_len) for all stored tuples in the receiver side,
fetch_more_data() in postgres_fdw.

They are about 50% gain for the smaller tuple size and 25% for
the larger. They looks to be optimal at where alloced_mem is
around 2MB by the reason unknown to me. Anyway the difference
seems to be significant.

> Even if we wanted to do something like this, I strongly object to
> measuring size by heap_compute_data_size.  That's not a number that users
> would normally have any direct knowledge of; nor does it have anything
> at all to do with the claimed use-case, where what you'd really need to
> measure is bytes transmitted down the wire.  (The difference is not small:
> for instance, toasted values would likely still be toasted at the point
> where you're measuring.)

Sure. Finally, the attached patch #1 which does the following
things.

 - Sender limits the number of tuples using the sum of the net
   length of the column values to be sent, not including protocol
   overhead. It is calculated in the added function
   slot_compute_attr_size(), using raw length for compressed
   values.

 - postgres_fdw calculates fetch limit bytes by the following
   formula,

   MAX_FETCH_MEM - MAX_FETCH_SIZE * (estimated overhead per tuple);

The result of the patch is as follows. MAX_FETCH_MEM = 2MiB and
MAX_FETCH_SIZE = 30000.

fetch_size,   avg row size(*1),   time,   max alloced_mem/fetch(Mbytes)
(auto)         60                  2.4s   1080000 ( 1.08)
(auto)        204                  7.3s    536400 ( 0.54)
(auto)       1356                 15  s    430236 ( 0.43)

This is meaningfully fast but the patch looks too big and the
meaning of the new parameter is hard to understand..:(


On the other hand the cause of the displacements of alloced_mem
shown above is per-tuple overhead, the sum of which is unknown
before execution.  The second patch makes FETCH accept the tuple
overhead bytes. The result seems pretty good, but I think this
might be too spcialized to this usage.

MAX_FETCH_SIZE = 30000 and MAX_FETCH_MEM = 2MiB,
max_fetch_size,   avg row size(*1),   time,   max alloced_mem/fetch(MiBytes)
30000             60                  2.3s   1080000 ( 1.0)
 9932            204                  5.7s   1787760 ( 1.7)
 1376           1356                 13  s   1847484 ( 1.8)

MAX_FETCH_SIZE = 25000 and MAX_FETCH_MEM = 1MiB,
max_fetch_size,   avg row size(*1),   time,   max alloced_mem/fetch(MiBytes)
25000             60                  2.4s    900000 ( 0.86)
 4358            204                  6.6s    816840 ( 0.78)
  634           1356                 16  s    844488 ( 0.81)

MAX_FETCH_SIZE = 10000 and MAX_FETCH_MEM = 0.5MiB,
max_fetch_size,   avg row size(*1),   time,   max alloced_mem/fetch(MiBytes)
10000             60                  2.8s    360000 ( 0.35)
 2376            204                  7.8s    427680 ( 0.41)
  332           1356                 17  s    442224 ( 0.42)

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 4e5937d33e92d908462d567fa3264ae11404ecac Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 21 Jan 2015 17:18:09 +0900
Subject: [PATCH 1/2] Size limitation feature of FETCH v1

- Row size calculation is based on detoasted size.
---
 contrib/auto_explain/auto_explain.c             |   8 +-
 contrib/pg_stat_statements/pg_stat_statements.c |   8 +-
 contrib/postgres_fdw/postgres_fdw.c             | 103 +++++++++++++++++++-----
 src/backend/access/common/heaptuple.c           |  36 +++++++++
 src/backend/commands/copy.c                     |   2 +-
 src/backend/commands/createas.c                 |   2 +-
 src/backend/commands/explain.c                  |   2 +-
 src/backend/commands/extension.c                |   2 +-
 src/backend/commands/matview.c                  |   2 +-
 src/backend/commands/portalcmds.c               |   3 +-
 src/backend/commands/prepare.c                  |   2 +-
 src/backend/executor/execMain.c                 |  33 ++++++--
 src/backend/executor/execUtils.c                |   1 +
 src/backend/executor/functions.c                |   2 +-
 src/backend/executor/spi.c                      |   4 +-
 src/backend/parser/gram.y                       |  54 +++++++++++++
 src/backend/tcop/postgres.c                     |   2 +
 src/backend/tcop/pquery.c                       |  87 +++++++++++++-------
 src/include/access/htup_details.h               |   2 +
 src/include/executor/executor.h                 |   8 +-
 src/include/nodes/execnodes.h                   |   1 +
 src/include/nodes/parsenodes.h                  |   1 +
 src/include/tcop/pquery.h                       |   3 +-
 src/interfaces/ecpg/preproc/Makefile            |   2 +-
 src/interfaces/ecpg/preproc/ecpg.addons         |  63 +++++++++++++++
 25 files changed, 353 insertions(+), 80 deletions(-)

diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 2a184ed..f121a33 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -57,7 +57,7 @@ void		_PG_fini(void);
 static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void explain_ExecutorRun(QueryDesc *queryDesc,
 					ScanDirection direction,
-					long count);
+					long count, long size);
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
@@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nesting_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nesting_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 2629bfc..a68c11d 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -282,7 +282,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
 static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void pgss_ExecutorRun(QueryDesc *queryDesc,
 				 ScanDirection direction,
-				 long count);
+				 long count, long size);
 static void pgss_ExecutorFinish(QueryDesc *queryDesc);
 static void pgss_ExecutorEnd(QueryDesc *queryDesc);
 static void pgss_ProcessUtility(Node *parsetree, const char *queryString,
@@ -863,15 +863,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nested_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nested_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..b3bf27e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -46,6 +46,11 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
+/* Maximum tuples per fetch */
+#define MAX_FETCH_SIZE				30000
+
+/* Maximum memory usable for retrieved data  */
+#define MAX_FETCH_MEM				(2 * 1024 * 1024)
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -156,6 +161,8 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+
+	long		max_palloced_mem; /* For test, remove me later */
 } PgFdwScanState;
 
 /*
@@ -321,6 +328,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 							  double *totaldeadrows);
 static void analyze_row_processor(PGresult *res, int row,
 					  PgFdwAnalyzeState *astate);
+static Size estimate_tuple_overhead(TupleDesc tupDesc,
+									List *retrieved_attrs);
 static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   int row,
 						   Relation rel,
@@ -1095,6 +1104,7 @@ postgresEndForeignScan(ForeignScanState *node)
 	if (fsstate == NULL)
 		return;
 
+	elog(LOG, "Max memory fo tuple store = %ld", fsstate->max_palloced_mem);
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
 		close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2029,12 +2039,18 @@ fetch_more_data(ForeignScanState *node)
 		int			fetch_size;
 		int			numrows;
 		int			i;
+		long		alloc_size = 0;
 
 		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
-
-		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+		fetch_size = MAX_FETCH_MEM -
+			MAX_FETCH_SIZE *
+				estimate_tuple_overhead(fsstate->attinmeta->tupdesc,
+										fsstate->retrieved_attrs);
+
+		snprintf(sql, sizeof(sql), "FETCH %d LIMIT %ld FROM c%u",
+				 MAX_FETCH_SIZE,
+				 fetch_size,
+				 fsstate->cursor_number);
 
 		res = PQexec(conn, sql);
 		/* On error, report the original query, not the FETCH. */
@@ -2043,27 +2059,34 @@ fetch_more_data(ForeignScanState *node)
 
 		/* Convert the data into HeapTuples */
 		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
-
-		for (i = 0; i < numrows; i++)
+		if (numrows == 0)
+			fsstate->eof_reached;
+		else
 		{
-			fsstate->tuples[i] =
-				make_tuple_from_result_row(res, i,
-										   fsstate->rel,
-										   fsstate->attinmeta,
-										   fsstate->retrieved_attrs,
-										   fsstate->temp_cxt);
-		}
+			alloc_size = numrows * sizeof(HeapTuple);
+			fsstate->tuples = (HeapTuple *) palloc0(alloc_size);
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
 
-		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
-			fsstate->fetch_ct_2++;
+			for (i = 0; i < numrows; i++)
+			{
+				fsstate->tuples[i] =
+					make_tuple_from_result_row(res, i,
+											   fsstate->rel,
+											   fsstate->attinmeta,
+											   fsstate->retrieved_attrs,
+											   fsstate->temp_cxt);
+				alloc_size += fsstate->tuples[i]->t_len;
+			}
 
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+			if (alloc_size > fsstate->max_palloced_mem)
+				fsstate->max_palloced_mem = alloc_size;
 
+			/* Update fetch_ct_2 */
+			if (fsstate->fetch_ct_2 < 2)
+				fsstate->fetch_ct_2++;
+		}
+			
 		PQclear(res);
 		res = NULL;
 	}
@@ -2835,6 +2858,44 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 }
 
 /*
+ * Compute the estimated overhead of the result tuples
+ * See heap_form_tuple for the details of this calculation.
+ */
+static Size
+estimate_tuple_overhead(TupleDesc tupDesc,
+						List *retrieved_attrs)
+{
+	Size size = 0;
+	int	 ncol = list_length(retrieved_attrs);
+	int  nadded = 0;
+	ListCell	*lc;
+
+	size += offsetof(HeapTupleHeaderData, t_bits);
+	size += BITMAPLEN(ncol);
+
+	if (tupDesc->tdhasoid)
+		size += sizeof(Oid);
+
+	size = MAXALIGN(size);
+
+	size += sizeof(Datum) * ncol;
+	size += sizeof(bool) * ncol;
+
+	foreach (lc, retrieved_attrs)
+	{
+		int i = lfirst_int(lc);
+
+		if (i > 0)
+		{
+			if (tupDesc->attrs[i - 1]->attbyval)
+				size -= (sizeof(Datum) - tupDesc->attrs[i - 1]->attlen);
+		}
+	}
+
+	return size;
+}
+
+/*
  * Create a tuple from the specified row of the PGresult.
  *
  * rel is the local representation of the foreign table, attinmeta is
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index 867035d..2a577e5 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -60,6 +60,7 @@
 #include "access/sysattr.h"
 #include "access/tuptoaster.h"
 #include "executor/tuptable.h"
+#include "utils/pg_lzcompress.h"
 
 
 /* Does att's datatype allow packing into the 1-byte-header varlena format? */
@@ -120,6 +121,41 @@ heap_compute_data_size(TupleDesc tupleDesc,
 	return data_length;
 }
 
+Size
+slot_compute_attr_size(TupleTableSlot *slot)
+{
+	TupleDesc	 tupleDesc = slot->tts_tupleDescriptor;
+	Datum		*values = slot->tts_values;
+	bool		*isnull = slot->tts_isnull;
+	int			 nattrs = tupleDesc->natts;
+	int i;
+	Size		 sumattlen = 0;
+
+	if (slot->tts_nvalid < nattrs)
+	{
+		/*  We need all attributes deformed */
+		slot_getallattrs(slot);
+	}
+	for (i = 0 ; i < nattrs ; i++)
+	{
+		Form_pg_attribute thisatt = tupleDesc->attrs[i];
+
+		if (isnull[i]) continue;
+
+		if (thisatt->attbyval)
+			sumattlen += thisatt->attlen;
+		else if (VARATT_IS_COMPRESSED(values[i]))
+		{
+			sumattlen += PGLZ_RAW_SIZE((PGLZ_Header *)values[i]);
+		}
+		else if (VARATT_IS_SHORT(values[i]))
+			sumattlen += VARSIZE_SHORT(values[i]) - VARHDRSZ_SHORT;
+		else
+			sumattlen += VARSIZE(values[i]) - VARHDRSZ;
+	}
+	return sumattlen;
+}
+
 /*
  * heap_fill_tuple
  *		Load data portion of a tuple from values/isnull arrays
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 0e604b7..b6e6523 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1915,7 +1915,7 @@ CopyTo(CopyState cstate)
 	else
 	{
 		/* run the plan --- the dest receiver will send tuples */
-		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index abc0fe8..c5c4478 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		dir = ForwardScanDirection;
 
 	/* run the plan */
-	ExecutorRun(queryDesc, dir, 0L);
+	ExecutorRun(queryDesc, dir, 0L, 0L);
 
 	/* save the rowcount if we're given a completionTag to fill */
 	if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 7cfc9bb..2c23e9b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -489,7 +489,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			dir = ForwardScanDirection;
 
 		/* run the plan */
-		ExecutorRun(queryDesc, dir, 0L);
+		ExecutorRun(queryDesc, dir, 0L, 0L);
 
 		/* run cleanup too */
 		ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index 3b95552..f624567 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -736,7 +736,7 @@ execute_sql_string(const char *sql, const char *filename)
 										dest, NULL, 0);
 
 				ExecutorStart(qdesc, 0);
-				ExecutorRun(qdesc, ForwardScanDirection, 0);
+				ExecutorRun(qdesc, ForwardScanDirection, 0, 0);
 				ExecutorFinish(qdesc);
 				ExecutorEnd(qdesc);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 74415b8..6530ecb 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -360,7 +360,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
 	/* run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/* and clean up */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 2794537..255c86e 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -177,6 +177,7 @@ PerformPortalFetch(FetchStmt *stmt,
 	nprocessed = PortalRunFetch(portal,
 								stmt->direction,
 								stmt->howMany,
+								stmt->howLarge,
 								dest);
 
 	/* Return command status if wanted */
@@ -375,7 +376,7 @@ PersistHoldablePortal(Portal portal)
 										true);
 
 		/* Fetch the result set into the tuplestore */
-		ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 		(*queryDesc->dest->rDestroy) (queryDesc->dest);
 		queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 71b08f0..31799f5 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
 	 */
 	PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-	(void) PortalRun(portal, count, false, dest, dest, completionTag);
+	(void) PortalRun(portal, count, 0L, false, dest, dest, completionTag);
 
 	PortalDrop(portal, false);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b9f21c5..d976bf3 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -78,6 +78,7 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -248,17 +249,17 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count)
+			ScanDirection direction, long count, long size)
 {
 	if (ExecutorRun_hook)
-		(*ExecutorRun_hook) (queryDesc, direction, count);
+		(*ExecutorRun_hook) (queryDesc, direction, count, size);
 	else
-		standard_ExecutorRun(queryDesc, direction, count);
+		standard_ExecutorRun(queryDesc, direction, count, size);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count)
+					 ScanDirection direction, long count, long size)
 {
 	EState	   *estate;
 	CmdType		operation;
@@ -310,6 +311,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 					operation,
 					sendTuples,
 					count,
+					size,
 					direction,
 					dest);
 
@@ -1450,22 +1452,26 @@ ExecutePlan(EState *estate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest)
 {
 	TupleTableSlot *slot;
 	long		current_tuple_count;
+	long		sent_size;
 
 	/*
 	 * initialize local variables
 	 */
 	current_tuple_count = 0;
-
+	sent_size = 0;
 	/*
 	 * Set the direction.
 	 */
 	estate->es_direction = direction;
 
+	estate->es_stoppedbysize = false;
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
@@ -1520,6 +1526,23 @@ ExecutePlan(EState *estate,
 		current_tuple_count++;
 		if (numberTuples && numberTuples == current_tuple_count)
 			break;
+
+		if (sizeTuples > 0)
+		{
+			/*
+			 * Count the size of tuples we've sent
+			 *
+			 * This needs all attributes deformed so a bit slow on some cases.
+			 */
+			sent_size += slot_compute_attr_size(slot);
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (sizeTuples < sent_size)
+			{
+				estate->es_stoppedbysize = true;
+				break;
+			}
+		}
 	}
 }
 
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 32697dd..ff2c395 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -133,6 +133,7 @@ CreateExecutorState(void)
 	estate->es_rowMarks = NIL;
 
 	estate->es_processed = 0;
+	estate->es_stoppedbysize = false;
 	estate->es_lastoid = InvalidOid;
 
 	estate->es_top_eflags = 0;
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 84be37c..d64e908 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -850,7 +850,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
 		/* Run regular commands to completion unless lazyEval */
 		long		count = (es->lazyEval) ? 1L : 0L;
 
-		ExecutorRun(es->qd, ForwardScanDirection, count);
+		ExecutorRun(es->qd, ForwardScanDirection, count, 0L);
 
 		/*
 		 * If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 4b86e91..cb30cfb 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2369,7 +2369,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
 
 	ExecutorStart(queryDesc, eflags);
 
-	ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L);
 
 	_SPI_current->processed = queryDesc->estate->es_processed;
 	_SPI_current->lastoid = queryDesc->estate->es_lastoid;
@@ -2447,7 +2447,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	/* Run the cursor */
 	nfetched = PortalRunFetch(portal,
 							  direction,
-							  count,
+							  count, 0L,
 							  dest);
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 36dac29..e559d1a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -6021,6 +6021,15 @@ fetch_args:	cursor_name
 					n->howMany = $1;
 					$$ = (Node *)n;
 				}
+			| SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $1;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6029,6 +6038,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| FORWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6045,6 +6063,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| FORWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| FORWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6053,6 +6080,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| FORWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6069,6 +6105,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| BACKWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6077,6 +6122,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| BACKWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 		;
 
 from_in:	FROM									{}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8f74353..55f062b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1043,6 +1043,7 @@ exec_simple_query(const char *query_string)
 		 */
 		(void) PortalRun(portal,
 						 FETCH_ALL,
+						 0,
 						 isTopLevel,
 						 receiver,
 						 receiver,
@@ -1928,6 +1929,7 @@ exec_execute_message(const char *portal_name, long max_rows)
 
 	completed = PortalRun(portal,
 						  max_rows,
+						  0,
 						  true, /* always top level */
 						  receiver,
 						  receiver,
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 9c14e8a..1456c5a 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/htup_details.h"
 #include "commands/prepare.h"
 #include "executor/tstoreReceiver.h"
 #include "miscadmin.h"
@@ -39,9 +40,10 @@ static void ProcessQuery(PlannedStmt *plan,
 			 DestReceiver *dest,
 			 char *completionTag);
 static void FillPortalStore(Portal portal, bool isTopLevel);
-static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
+static uint32 RunFromStore(Portal portal, ScanDirection direction,
+		     long count, long size, bool *stoppedbysize,
 			 DestReceiver *dest);
-static long PortalRunSelect(Portal portal, bool forward, long count,
+static long PortalRunSelect(Portal portal, bool forward, long count, long size,
 				DestReceiver *dest);
 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
 				 DestReceiver *dest, char *completionTag);
@@ -51,6 +53,7 @@ static void PortalRunMulti(Portal portal, bool isTopLevel,
 static long DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest);
 static void DoPortalRewind(Portal portal);
 
@@ -182,7 +185,7 @@ ProcessQuery(PlannedStmt *plan,
 	/*
 	 * Run the plan to completion.
 	 */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/*
 	 * Build command completion status string, if caller wants one.
@@ -703,7 +706,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, bool isTopLevel,
+PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag)
 {
@@ -787,7 +790,7 @@ PortalRun(Portal portal, long count, bool isTopLevel,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				nprocessed = PortalRunSelect(portal, true, count, dest);
+				nprocessed = PortalRunSelect(portal, true, count, size, dest);
 
 				/*
 				 * If the portal result contains a command tag and the caller
@@ -892,11 +895,13 @@ static long
 PortalRunSelect(Portal portal,
 				bool forward,
 				long count,
+				long size,
 				DestReceiver *dest)
 {
 	QueryDesc  *queryDesc;
 	ScanDirection direction;
 	uint32		nprocessed;
+	bool		stoppedbysize;
 
 	/*
 	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a
@@ -939,12 +944,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -954,8 +961,9 @@ PortalRunSelect(Portal portal,
 
 			if (nprocessed > 0)
 				portal->atStart = false;		/* OK to go backward now */
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 				portal->atEnd = true;	/* we retrieved 'em all */
 			oldPos = portal->portalPos;
 			portal->portalPos += nprocessed;
@@ -982,12 +990,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -998,8 +1008,9 @@ PortalRunSelect(Portal portal,
 				portal->atEnd = false;	/* OK to go forward now */
 				portal->portalPos++;	/* adjust for endpoint case */
 			}
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 			{
 				portal->atStart = true; /* we retrieved 'em all */
 				portal->portalPos = 0;
@@ -1089,10 +1100,13 @@ FillPortalStore(Portal portal, bool isTopLevel)
  */
 static uint32
 RunFromStore(Portal portal, ScanDirection direction, long count,
-			 DestReceiver *dest)
+			 long size_limit, bool *stoppedbysize, DestReceiver *dest)
 {
 	long		current_tuple_count = 0;
 	TupleTableSlot *slot;
+	long			sent_size = 0;
+
+	*stoppedbysize = false;
 
 	slot = MakeSingleTupleTableSlot(portal->tupDesc);
 
@@ -1123,6 +1137,9 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 
 			(*dest->receiveSlot) (slot, dest);
 
+			/* Count the size of tuples we've sent */
+			sent_size += slot_compute_attr_size(slot);
+
 			ExecClearTuple(slot);
 
 			/*
@@ -1133,6 +1150,14 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			current_tuple_count++;
 			if (count && count == current_tuple_count)
 				break;
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (current_tuple_count > 0 &&
+				size_limit > 0 && size_limit < sent_size)
+			{
+				*stoppedbysize = true;
+				break;
+			}
 		}
 	}
 
@@ -1385,6 +1410,7 @@ long
 PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest)
 {
 	long		result;
@@ -1422,7 +1448,7 @@ PortalRunFetch(Portal portal,
 		switch (portal->strategy)
 		{
 			case PORTAL_ONE_SELECT:
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			case PORTAL_ONE_RETURNING:
@@ -1439,7 +1465,7 @@ PortalRunFetch(Portal portal,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			default:
@@ -1484,6 +1510,7 @@ static long
 DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest)
 {
 	bool		forward;
@@ -1526,7 +1553,7 @@ DoPortalRunFetch(Portal portal,
 				{
 					DoPortalRewind(portal);
 					if (count > 1)
-						PortalRunSelect(portal, true, count - 1,
+						PortalRunSelect(portal, true, count - 1, 0L,
 										None_Receiver);
 				}
 				else
@@ -1536,13 +1563,13 @@ DoPortalRunFetch(Portal portal,
 					if (portal->atEnd)
 						pos++;	/* need one extra fetch if off end */
 					if (count <= pos)
-						PortalRunSelect(portal, false, pos - count + 1,
+						PortalRunSelect(portal, false, pos - count + 1, 0L,
 										None_Receiver);
 					else if (count > pos + 1)
-						PortalRunSelect(portal, true, count - pos - 1,
+						PortalRunSelect(portal, true, count - pos - 1, 0L,
 										None_Receiver);
 				}
-				return PortalRunSelect(portal, true, 1L, dest);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1553,17 +1580,17 @@ DoPortalRunFetch(Portal portal,
 				 * (Is it worth considering case where count > half of size of
 				 * query?  We could rewind once we know the size ...)
 				 */
-				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
+				PortalRunSelect(portal, true, FETCH_ALL, 0L, None_Receiver);
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
 				/* count == 0 */
 				/* Rewind to start, return zero rows */
 				DoPortalRewind(portal);
-				return PortalRunSelect(portal, true, 0L, dest);
+				return PortalRunSelect(portal, true, 0L, 0L, dest);
 			}
 			break;
 		case FETCH_RELATIVE:
@@ -1573,8 +1600,8 @@ DoPortalRunFetch(Portal portal,
 				 * Definition: advance count-1 rows, return next row (if any).
 				 */
 				if (count > 1)
-					PortalRunSelect(portal, true, count - 1, None_Receiver);
-				return PortalRunSelect(portal, true, 1L, dest);
+					PortalRunSelect(portal, true, count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1583,8 +1610,8 @@ DoPortalRunFetch(Portal portal,
 				 * any).
 				 */
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
@@ -1630,7 +1657,7 @@ DoPortalRunFetch(Portal portal,
 			 */
 			if (on_row)
 			{
-				PortalRunSelect(portal, false, 1L, None_Receiver);
+				PortalRunSelect(portal, false, 1L, 0L, None_Receiver);
 				/* Set up to fetch one row forward */
 				count = 1;
 				forward = true;
@@ -1652,7 +1679,7 @@ DoPortalRunFetch(Portal portal,
 		return result;
 	}
 
-	return PortalRunSelect(portal, forward, count, dest);
+	return PortalRunSelect(portal, forward, count, size, dest);
 }
 
 /*
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index d2ad910..2eeba00 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -20,6 +20,7 @@
 #include "access/transam.h"
 #include "storage/bufpage.h"
 
+#include "executor/tuptable.h"
 /*
  * MaxTupleAttributeNumber limits the number of (user) columns in a tuple.
  * The key limit on this value is that the size of the fixed overhead for
@@ -723,6 +724,7 @@ extern Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
 /* prototypes for functions in common/heaptuple.c */
 extern Size heap_compute_data_size(TupleDesc tupleDesc,
 					   Datum *values, bool *isnull);
+extern Size slot_compute_attr_size(TupleTableSlot *slot);
 extern void heap_fill_tuple(TupleDesc tupleDesc,
 				Datum *values, bool *isnull,
 				char *data, Size data_size,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 40fde83..64a02c3 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -80,8 +80,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
-												   ScanDirection direction,
-												   long count);
+									   ScanDirection direction,
+									   long count, long size);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +176,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count);
+			ScanDirection direction, long count, long size);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count);
+		    ScanDirection direction, long count, long size);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 41288ed..d963286 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -376,6 +376,7 @@ typedef struct EState
 	List	   *es_rowMarks;	/* List of ExecRowMarks */
 
 	uint32		es_processed;	/* # of tuples processed */
+	bool		es_stoppedbysize; /* true if processing stopped by size */
 	Oid			es_lastoid;		/* last oid processed (by INSERT) */
 
 	int			es_top_eflags;	/* eflags passed to ExecutorStart */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b1dfa85..9e18331 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2223,6 +2223,7 @@ typedef struct FetchStmt
 	NodeTag		type;
 	FetchDirection direction;	/* see above */
 	long		howMany;		/* number of rows, or position argument */
+	long		howLarge;		/* total bytes of rows */
 	char	   *portalname;		/* name of portal (cursor) */
 	bool		ismove;			/* TRUE if MOVE */
 } FetchStmt;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 8073a6e..afffe86 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -33,13 +33,14 @@ extern void PortalStart(Portal portal, ParamListInfo params,
 extern void PortalSetResultFormat(Portal portal, int nFormats,
 					  int16 *formats);
 
-extern bool PortalRun(Portal portal, long count, bool isTopLevel,
+extern bool PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag);
 
 extern long PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest);
 
 #endif   /* PQUERY_H */
diff --git a/src/interfaces/ecpg/preproc/Makefile b/src/interfaces/ecpg/preproc/Makefile
index 1ecc405..b492fa7 100644
--- a/src/interfaces/ecpg/preproc/Makefile
+++ b/src/interfaces/ecpg/preproc/Makefile
@@ -48,7 +48,7 @@ ecpg: $(OBJS) | submake-libpgport
 preproc.o: pgc.c
 
 preproc.h: preproc.c ;
-preproc.c: BISONFLAGS += -d
+preproc.c: BISONFLAGS += -r all -d
 
 preproc.y: ../../../backend/parser/gram.y parse.pl ecpg.addons ecpg.header ecpg.tokens ecpg.trailer ecpg.type
 	$(PERL) $(srcdir)/parse.pl $(srcdir) < $< > $@
diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons
index b3b36cf..bdccb68 100644
--- a/src/interfaces/ecpg/preproc/ecpg.addons
+++ b/src/interfaces/ecpg/preproc/ecpg.addons
@@ -220,13 +220,46 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon
 ECPG: fetch_argsPRIORopt_from_incursor_name addon
 ECPG: fetch_argsFIRST_Popt_from_incursor_name addon
 ECPG: fetch_argsLAST_Popt_from_incursor_name addon
+		add_additional_variables($3, false);
+		if ($3[0] == ':')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
 ECPG: fetch_argsALLopt_from_incursor_name addon
+ECPG: fetch_argsFORWARDopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
 		{
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDALLLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
@@ -234,11 +267,41 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 		if ($1[0] == '$')
 		{
 			free($1);
 			$1 = mm_strdup("$0");
 		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($2[0] == '$')
+		{
+			free($2);
+			$2 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon
 ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon
 		add_additional_variables($4, false);
-- 
2.1.0.GIT

>From 1b595089408f47bafa96f8c86ddbc7a5728f0e5e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 27 Jan 2015 11:15:17 +0900
Subject: [PATCH 2/2] Make FETCH can accept per-tuple memory overhead.

---
 contrib/postgres_fdw/postgres_fdw.c     | 25 ++++++-----
 src/backend/commands/copy.c             |  2 +-
 src/backend/commands/createas.c         |  2 +-
 src/backend/commands/explain.c          |  2 +-
 src/backend/commands/extension.c        |  2 +-
 src/backend/commands/matview.c          |  2 +-
 src/backend/commands/portalcmds.c       |  3 +-
 src/backend/commands/prepare.c          |  2 +-
 src/backend/executor/execMain.c         | 16 ++++---
 src/backend/executor/functions.c        |  2 +-
 src/backend/executor/spi.c              |  4 +-
 src/backend/parser/gram.y               | 35 ++++++++++-----
 src/backend/tcop/postgres.c             |  4 +-
 src/backend/tcop/pquery.c               | 79 ++++++++++++++++++++-------------
 src/include/executor/executor.h         |  6 +--
 src/include/nodes/parsenodes.h          |  1 +
 src/include/tcop/pquery.h               |  6 +--
 src/interfaces/ecpg/preproc/Makefile    |  2 +-
 src/interfaces/ecpg/preproc/ecpg.addons | 64 +++++++++++++++++---------
 19 files changed, 159 insertions(+), 100 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index b3bf27e..6633912 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -47,10 +47,10 @@ PG_MODULE_MAGIC;
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
 /* Maximum tuples per fetch */
-#define MAX_FETCH_SIZE				30000
+#define MAX_FETCH_SIZE				10000
 
 /* Maximum memory usable for retrieved data  */
-#define MAX_FETCH_MEM				(2 * 1024 * 1024)
+#define MAX_FETCH_MEM				(512 * 1024)
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -163,6 +163,7 @@ typedef struct PgFdwScanState
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
 	long		max_palloced_mem; /* For test, remove me later */
+	int			max_numrows;
 } PgFdwScanState;
 
 /*
@@ -1104,7 +1105,7 @@ postgresEndForeignScan(ForeignScanState *node)
 	if (fsstate == NULL)
 		return;
 
-	elog(LOG, "Max memory fo tuple store = %ld", fsstate->max_palloced_mem);
+	elog(LOG, "Max memory for tuple store = %ld, max numrows = %d", fsstate->max_palloced_mem, fsstate->max_numrows);
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
 		close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2037,19 +2038,18 @@ fetch_more_data(ForeignScanState *node)
 		PGconn	   *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
+		int			tuple_overhead;
 		int			numrows;
 		int			i;
 		long		alloc_size = 0;
 
 		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = MAX_FETCH_MEM -
-			MAX_FETCH_SIZE *
-				estimate_tuple_overhead(fsstate->attinmeta->tupdesc,
-										fsstate->retrieved_attrs);
-
-		snprintf(sql, sizeof(sql), "FETCH %d LIMIT %ld FROM c%u",
+		tuple_overhead = estimate_tuple_overhead(fsstate->attinmeta->tupdesc,
+												 fsstate->retrieved_attrs);
+		fetch_size = MAX_FETCH_MEM - MAX_FETCH_SIZE * sizeof(HeapTuple);
+		snprintf(sql, sizeof(sql), "FETCH %d LIMIT %ld (%d) FROM c%u",
 				 MAX_FETCH_SIZE,
-				 fetch_size,
+				 fetch_size, tuple_overhead,
 				 fsstate->cursor_number);
 
 		res = PQexec(conn, sql);
@@ -2059,6 +2059,9 @@ fetch_more_data(ForeignScanState *node)
 
 		/* Convert the data into HeapTuples */
 		numrows = PQntuples(res);
+		if (fsstate->max_numrows < numrows)
+			fsstate->max_numrows = numrows;
+
 		if (numrows == 0)
 			fsstate->eof_reached;
 		else
@@ -2079,7 +2082,7 @@ fetch_more_data(ForeignScanState *node)
 				alloc_size += fsstate->tuples[i]->t_len;
 			}
 
-			if (alloc_size > fsstate->max_palloced_mem)
+			if (fsstate->max_palloced_mem < alloc_size)
 				fsstate->max_palloced_mem = alloc_size;
 
 			/* Update fetch_ct_2 */
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b6e6523..6ddae82 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1915,7 +1915,7 @@ CopyTo(CopyState cstate)
 	else
 	{
 		/* run the plan --- the dest receiver will send tuples */
-		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L);
+		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L, 0);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index c5c4478..1644f86 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		dir = ForwardScanDirection;
 
 	/* run the plan */
-	ExecutorRun(queryDesc, dir, 0L, 0L);
+	ExecutorRun(queryDesc, dir, 0L, 0L, 0);
 
 	/* save the rowcount if we're given a completionTag to fill */
 	if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 2c23e9b..1b423ee 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -489,7 +489,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			dir = ForwardScanDirection;
 
 		/* run the plan */
-		ExecutorRun(queryDesc, dir, 0L, 0L);
+		ExecutorRun(queryDesc, dir, 0L, 0L, 0);
 
 		/* run cleanup too */
 		ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index f624567..2360ffa 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -736,7 +736,7 @@ execute_sql_string(const char *sql, const char *filename)
 										dest, NULL, 0);
 
 				ExecutorStart(qdesc, 0);
-				ExecutorRun(qdesc, ForwardScanDirection, 0, 0);
+				ExecutorRun(qdesc, ForwardScanDirection, 0L, 0L, 0);
 				ExecutorFinish(qdesc);
 				ExecutorEnd(qdesc);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 6530ecb..54669c5 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -360,7 +360,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
 	/* run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0);
 
 	/* and clean up */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 255c86e..85fffc1 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -178,6 +178,7 @@ PerformPortalFetch(FetchStmt *stmt,
 								stmt->direction,
 								stmt->howMany,
 								stmt->howLarge,
+								stmt->tupoverhead,
 								dest);
 
 	/* Return command status if wanted */
@@ -376,7 +377,7 @@ PersistHoldablePortal(Portal portal)
 										true);
 
 		/* Fetch the result set into the tuplestore */
-		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
+		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0);
 
 		(*queryDesc->dest->rDestroy) (queryDesc->dest);
 		queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 31799f5..e46367a 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
 	 */
 	PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-	(void) PortalRun(portal, count, 0L, false, dest, dest, completionTag);
+	(void) PortalRun(portal, count, 0L, 0, false, dest, dest, completionTag);
 
 	PortalDrop(portal, false);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index d976bf3..b40702c 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -79,6 +79,7 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 			bool sendTuples,
 			long numberTuples,
 			long sizeTuples,
+			int  tupleOverhead,
 			ScanDirection direction,
 			DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -249,17 +250,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count, long size)
+			ScanDirection direction, long count, long size, int tupoverhead)
 {
 	if (ExecutorRun_hook)
-		(*ExecutorRun_hook) (queryDesc, direction, count, size);
+		(*ExecutorRun_hook) (queryDesc, direction,
+							 count, size, tupoverhead);
 	else
-		standard_ExecutorRun(queryDesc, direction, count, size);
+		standard_ExecutorRun(queryDesc, direction,
+							 count, size, tupoverhead);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count, long size)
+					 ScanDirection direction,
+					 long count, long size, int tupoverhead)
 {
 	EState	   *estate;
 	CmdType		operation;
@@ -312,6 +316,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 					sendTuples,
 					count,
 					size,
+					tupoverhead,
 					direction,
 					dest);
 
@@ -1453,6 +1458,7 @@ ExecutePlan(EState *estate,
 			bool sendTuples,
 			long numberTuples,
 			long sizeTuples,
+			int  tupleOverhead,
 			ScanDirection direction,
 			DestReceiver *dest)
 {
@@ -1534,7 +1540,7 @@ ExecutePlan(EState *estate,
 			 *
 			 * This needs all attributes deformed so a bit slow on some cases.
 			 */
-			sent_size += slot_compute_attr_size(slot);
+			sent_size += slot_compute_attr_size(slot) + tupleOverhead;
 
 			/* Quit when the size limit will be exceeded by this tuple */
 			if (sizeTuples < sent_size)
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index d64e908..9b46c95 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -850,7 +850,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
 		/* Run regular commands to completion unless lazyEval */
 		long		count = (es->lazyEval) ? 1L : 0L;
 
-		ExecutorRun(es->qd, ForwardScanDirection, count, 0L);
+		ExecutorRun(es->qd, ForwardScanDirection, count, 0L, 0);
 
 		/*
 		 * If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index cb30cfb..889a65c 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2369,7 +2369,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
 
 	ExecutorStart(queryDesc, eflags);
 
-	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L, 0);
 
 	_SPI_current->processed = queryDesc->estate->es_processed;
 	_SPI_current->lastoid = queryDesc->estate->es_lastoid;
@@ -2447,7 +2447,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	/* Run the cursor */
 	nfetched = PortalRunFetch(portal,
 							  direction,
-							  count, 0L,
+							  count, 0L, 0,
 							  dest);
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e559d1a..4507ea2 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -520,6 +520,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <str>		opt_existing_window_name
 %type <boolean> opt_if_not_exists
 
+%type <ival>	opt_overhead
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -6021,13 +6023,14 @@ fetch_args:	cursor_name
 					n->howMany = $1;
 					$$ = (Node *)n;
 				}
-			| SignedIconst LIMIT Iconst opt_from_in cursor_name
+			| SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
-					n->portalname = $5;
+					n->portalname = $6;
 					n->direction = FETCH_FORWARD;
 					n->howMany = $1;
 					n->howLarge = $3;
+					n->tupoverhead = $4;
 					$$ = (Node *)n;
 				}
 			| ALL opt_from_in cursor_name
@@ -6038,13 +6041,14 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
-			| ALL LIMIT Iconst opt_from_in cursor_name
+			| ALL LIMIT Iconst opt_overhead opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
-					n->portalname = $5;
+					n->portalname = $6;
 					n->direction = FETCH_FORWARD;
 					n->howMany = FETCH_ALL;
 					n->howLarge = $3;
+					n->tupoverhead = $4;
 					$$ = (Node *)n;
 				}
 			| FORWARD opt_from_in cursor_name
@@ -6063,13 +6067,14 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
-			| FORWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+			| FORWARD SignedIconst LIMIT Iconst opt_overhead opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
-					n->portalname = $6;
+					n->portalname = $7;
 					n->direction = FETCH_FORWARD;
 					n->howMany = $2;
 					n->howLarge = $4;
+					n->tupoverhead = $5;
 					$$ = (Node *)n;
 				}
 			| FORWARD ALL opt_from_in cursor_name
@@ -6080,13 +6085,14 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
-			| FORWARD ALL LIMIT Iconst opt_from_in cursor_name
+			| FORWARD ALL LIMIT Iconst  opt_overhead  opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
-					n->portalname = $6;
+					n->portalname = $7;
 					n->direction = FETCH_FORWARD;
 					n->howMany = FETCH_ALL;
 					n->howLarge = $4;
+					n->tupoverhead = $5;
 					$$ = (Node *)n;
 				}
 			| BACKWARD opt_from_in cursor_name
@@ -6105,13 +6111,14 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
-			| BACKWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+			| BACKWARD SignedIconst LIMIT Iconst  opt_overhead opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
-					n->portalname = $6;
+					n->portalname = $7;
 					n->direction = FETCH_BACKWARD;
 					n->howMany = $2;
 					n->howLarge = $4;
+					n->tupoverhead = $5;
 					$$ = (Node *)n;
 				}
 			| BACKWARD ALL opt_from_in cursor_name
@@ -6122,13 +6129,14 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
-			| BACKWARD ALL LIMIT Iconst opt_from_in cursor_name
+			| BACKWARD ALL LIMIT Iconst  opt_overhead opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
-					n->portalname = $6;
+					n->portalname = $7;
 					n->direction = FETCH_BACKWARD;
 					n->howMany = FETCH_ALL;
 					n->howLarge = $4;
+					n->tupoverhead = $5;
 					$$ = (Node *)n;
 				}
 		;
@@ -6141,6 +6149,9 @@ opt_from_in:	from_in								{}
 			| /* EMPTY */							{}
 		;
 
+opt_overhead:	'(' Iconst ')'						{ $$ = $2;}
+			| /* EMPTY */							{ $$ = 0; }
+		;
 
 /*****************************************************************************
  *
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 55f062b..5261197 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1043,7 +1043,7 @@ exec_simple_query(const char *query_string)
 		 */
 		(void) PortalRun(portal,
 						 FETCH_ALL,
-						 0,
+						 0L, 0,
 						 isTopLevel,
 						 receiver,
 						 receiver,
@@ -1929,7 +1929,7 @@ exec_execute_message(const char *portal_name, long max_rows)
 
 	completed = PortalRun(portal,
 						  max_rows,
-						  0,
+						  0L, 0,
 						  true, /* always top level */
 						  receiver,
 						  receiver,
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 1456c5a..6628b19 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -41,9 +41,10 @@ static void ProcessQuery(PlannedStmt *plan,
 			 char *completionTag);
 static void FillPortalStore(Portal portal, bool isTopLevel);
 static uint32 RunFromStore(Portal portal, ScanDirection direction,
-		     long count, long size, bool *stoppedbysize,
+		     long count, long size, int tupoverhead, bool *stoppedbysize,
 			 DestReceiver *dest);
-static long PortalRunSelect(Portal portal, bool forward, long count, long size,
+static long PortalRunSelect(Portal portal, bool forward,
+				long count, long size, int tupoverhead,
 				DestReceiver *dest);
 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
 				 DestReceiver *dest, char *completionTag);
@@ -54,6 +55,7 @@ static long DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
 				 long size,
+				 int tupoverehad,
 				 DestReceiver *dest);
 static void DoPortalRewind(Portal portal);
 
@@ -185,7 +187,7 @@ ProcessQuery(PlannedStmt *plan,
 	/*
 	 * Run the plan to completion.
 	 */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L, 0);
 
 	/*
 	 * Build command completion status string, if caller wants one.
@@ -706,8 +708,8 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, long size, bool isTopLevel,
-		  DestReceiver *dest, DestReceiver *altdest,
+PortalRun(Portal portal, long count, long size, int tupoverhead,
+		  bool isTopLevel, DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag)
 {
 	bool		result;
@@ -790,7 +792,8 @@ PortalRun(Portal portal, long count, long size, bool isTopLevel,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				nprocessed = PortalRunSelect(portal, true, count, size, dest);
+				nprocessed = PortalRunSelect(portal, true,
+											 count, size, tupoverhead, dest);
 
 				/*
 				 * If the portal result contains a command tag and the caller
@@ -896,6 +899,7 @@ PortalRunSelect(Portal portal,
 				bool forward,
 				long count,
 				long size,
+				int  tupoverhead,
 				DestReceiver *dest)
 {
 	QueryDesc  *queryDesc;
@@ -944,12 +948,13 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count,
-									  size, &stoppedbysize, dest);
+			nprocessed = RunFromStore(portal, direction,
+									  count, size, tupoverhead,
+									  &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count, size);
+			ExecutorRun(queryDesc, direction, count, size, tupoverhead);
 			nprocessed = queryDesc->estate->es_processed;
 			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
@@ -990,12 +995,13 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count,
-									  size, &stoppedbysize, dest);
+			nprocessed = RunFromStore(portal, direction,
+									  count, size, tupoverhead,
+									  &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count, size);
+			ExecutorRun(queryDesc, direction, count, size, tupoverhead);
 			nprocessed = queryDesc->estate->es_processed;
 			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
@@ -1099,8 +1105,9 @@ FillPortalStore(Portal portal, bool isTopLevel)
  * out for memory leaks.
  */
 static uint32
-RunFromStore(Portal portal, ScanDirection direction, long count,
-			 long size_limit, bool *stoppedbysize, DestReceiver *dest)
+RunFromStore(Portal portal, ScanDirection direction,
+			 long count, long size_limit, int tupoverhead,
+			 bool *stoppedbysize, DestReceiver *dest)
 {
 	long		current_tuple_count = 0;
 	TupleTableSlot *slot;
@@ -1138,7 +1145,7 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			(*dest->receiveSlot) (slot, dest);
 
 			/* Count the size of tuples we've sent */
-			sent_size += slot_compute_attr_size(slot);
+			sent_size += slot_compute_attr_size(slot) + tupoverhead;
 
 			ExecClearTuple(slot);
 
@@ -1411,6 +1418,7 @@ PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
 			   long size,
+			   int  tupoverhead,
 			   DestReceiver *dest)
 {
 	long		result;
@@ -1448,7 +1456,8 @@ PortalRunFetch(Portal portal,
 		switch (portal->strategy)
 		{
 			case PORTAL_ONE_SELECT:
-				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
+				result = DoPortalRunFetch(portal, fdirection,
+										  count, size, tupoverhead, dest);
 				break;
 
 			case PORTAL_ONE_RETURNING:
@@ -1465,7 +1474,8 @@ PortalRunFetch(Portal portal,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
+				result = DoPortalRunFetch(portal, fdirection,
+										  count, size, tupoverhead, dest);
 				break;
 
 			default:
@@ -1511,6 +1521,7 @@ DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
 				 long size,
+				 int  tupoverhead,
 				 DestReceiver *dest)
 {
 	bool		forward;
@@ -1553,7 +1564,7 @@ DoPortalRunFetch(Portal portal,
 				{
 					DoPortalRewind(portal);
 					if (count > 1)
-						PortalRunSelect(portal, true, count - 1, 0L,
+						PortalRunSelect(portal, true, count - 1, 0L, 0,
 										None_Receiver);
 				}
 				else
@@ -1563,13 +1574,15 @@ DoPortalRunFetch(Portal portal,
 					if (portal->atEnd)
 						pos++;	/* need one extra fetch if off end */
 					if (count <= pos)
-						PortalRunSelect(portal, false, pos - count + 1, 0L,
+						PortalRunSelect(portal, false,
+										pos - count + 1, 0L, 0,
 										None_Receiver);
 					else if (count > pos + 1)
-						PortalRunSelect(portal, true, count - pos - 1, 0L,
+						PortalRunSelect(portal, true,
+										count - pos - 1, 0L, 0,
 										None_Receiver);
 				}
-				return PortalRunSelect(portal, true, 1L, 0L, dest);
+				return PortalRunSelect(portal, true, 1L, 0L, 0, dest);
 			}
 			else if (count < 0)
 			{
@@ -1580,17 +1593,19 @@ DoPortalRunFetch(Portal portal,
 				 * (Is it worth considering case where count > half of size of
 				 * query?  We could rewind once we know the size ...)
 				 */
-				PortalRunSelect(portal, true, FETCH_ALL, 0L, None_Receiver);
+				PortalRunSelect(portal, true,
+								FETCH_ALL, 0L, 0, None_Receiver);
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, 0, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, 0L, dest);
+					PortalRunSelect(portal, false,
+									-count - 1, 0, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, 0, dest);
 			}
 			else
 			{
 				/* count == 0 */
 				/* Rewind to start, return zero rows */
 				DoPortalRewind(portal);
-				return PortalRunSelect(portal, true, 0L, 0L, dest);
+				return PortalRunSelect(portal, true, 0L, 0L, 0, dest);
 			}
 			break;
 		case FETCH_RELATIVE:
@@ -1600,8 +1615,9 @@ DoPortalRunFetch(Portal portal,
 				 * Definition: advance count-1 rows, return next row (if any).
 				 */
 				if (count > 1)
-					PortalRunSelect(portal, true, count - 1, 0L, None_Receiver);
-				return PortalRunSelect(portal, true, 1L, 0L, dest);
+					PortalRunSelect(portal, true,
+									count - 1, 0L, 0, None_Receiver);
+				return PortalRunSelect(portal, true, 1L, 0L, 0, dest);
 			}
 			else if (count < 0)
 			{
@@ -1610,8 +1626,9 @@ DoPortalRunFetch(Portal portal,
 				 * any).
 				 */
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, 0L, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, 0L, dest);
+					PortalRunSelect(portal, false,
+									-count - 1, 0L, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, 0, dest);
 			}
 			else
 			{
@@ -1657,7 +1674,7 @@ DoPortalRunFetch(Portal portal,
 			 */
 			if (on_row)
 			{
-				PortalRunSelect(portal, false, 1L, 0L, None_Receiver);
+				PortalRunSelect(portal, false, 1L, 0L, 0, None_Receiver);
 				/* Set up to fetch one row forward */
 				count = 1;
 				forward = true;
@@ -1679,7 +1696,7 @@ DoPortalRunFetch(Portal portal,
 		return result;
 	}
 
-	return PortalRunSelect(portal, forward, count, size, dest);
+	return PortalRunSelect(portal, forward, count, size, tupoverhead, dest);
 }
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 64a02c3..316568f 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -81,7 +81,7 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
 									   ScanDirection direction,
-									   long count, long size);
+									   long count, long size, int tupoverhead);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +176,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count, long size);
+		ScanDirection direction, long count, long size, int tupoverhead);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-		    ScanDirection direction, long count, long size);
+		 ScanDirection direction, long count, long size, int tupoverhead);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 9e18331..f86694b 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2224,6 +2224,7 @@ typedef struct FetchStmt
 	FetchDirection direction;	/* see above */
 	long		howMany;		/* number of rows, or position argument */
 	long		howLarge;		/* total bytes of rows */
+	int			tupoverhead;	/* declared overhead per tuple in client */
 	char	   *portalname;		/* name of portal (cursor) */
 	bool		ismove;			/* TRUE if MOVE */
 } FetchStmt;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index afffe86..021532c 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -17,7 +17,6 @@
 #include "nodes/parsenodes.h"
 #include "utils/portal.h"
 
-
 extern PGDLLIMPORT Portal ActivePortal;
 
 
@@ -33,14 +32,15 @@ extern void PortalStart(Portal portal, ParamListInfo params,
 extern void PortalSetResultFormat(Portal portal, int nFormats,
 					  int16 *formats);
 
-extern bool PortalRun(Portal portal, long count, long size, bool isTopLevel,
-		  DestReceiver *dest, DestReceiver *altdest,
+extern bool PortalRun(Portal portal, long count, long size, int tupoverhead,
+		  bool isTopLevel, DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag);
 
 extern long PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
 			   long size,
+			   int tupoverhead,
 			   DestReceiver *dest);
 
 #endif   /* PQUERY_H */
diff --git a/src/interfaces/ecpg/preproc/Makefile b/src/interfaces/ecpg/preproc/Makefile
index b492fa7..1ecc405 100644
--- a/src/interfaces/ecpg/preproc/Makefile
+++ b/src/interfaces/ecpg/preproc/Makefile
@@ -48,7 +48,7 @@ ecpg: $(OBJS) | submake-libpgport
 preproc.o: pgc.c
 
 preproc.h: preproc.c ;
-preproc.c: BISONFLAGS += -r all -d
+preproc.c: BISONFLAGS += -d
 
 preproc.y: ../../../backend/parser/gram.y parse.pl ecpg.addons ecpg.header ecpg.tokens ecpg.trailer ecpg.type
 	$(PERL) $(srcdir)/parse.pl $(srcdir) < $< > $@
diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons
index bdccb68..424f412 100644
--- a/src/interfaces/ecpg/preproc/ecpg.addons
+++ b/src/interfaces/ecpg/preproc/ecpg.addons
@@ -235,31 +235,41 @@ ECPG: fetch_argsBACKWARDopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
-ECPG: fetch_argsALLLIMITIconstopt_from_incursor_name addon
-		add_additional_variables($5, false);
-		if ($5[0] == ':')
+ECPG: fetch_argsALLLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
 		{
-			free($5);
-			$5 = mm_strdup("$0");
+			free($6);
+			$6 = mm_strdup("$0");
 		}
 		if ($3[0] == '$')
 		{
 			free($3);
 			$3 = mm_strdup("$0");
 		}
-ECPG: fetch_argsFORWARDALLLIMITIconstopt_from_incursor_name addon
-ECPG: fetch_argsBACKWARDALLLIMITIconstopt_from_incursor_name addon
-		add_additional_variables($6, false);
-		if ($6[0] == ':')
+		if ($4[0] == '$')
 		{
-			free($6);
-			$6 = mm_strdup("$0");
+			free($4);
+			$4 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDALLLIMITIconstopt_overheadopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDALLLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($7, false);
+		if ($7[0] == ':')
+		{
+			free($7);
+			$7 = mm_strdup("$0");
 		}
 		if ($4[0] == '$')
 		{
 			free($4);
 			$4 = mm_strdup("$0");
 		}
+		if ($5[0] == '$')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
@@ -267,12 +277,12 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
-ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon
-		add_additional_variables($5, false);
-		if ($5[0] == ':')
+ECPG: fetch_argsSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
 		{
-			free($5);
-			$5 = mm_strdup("$0");
+			free($6);
+			$6 = mm_strdup("$0");
 		}
 		if ($1[0] == '$')
 		{
@@ -284,13 +294,18 @@ ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
-ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_from_incursor_name addon
-ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon
-		add_additional_variables($6, false);
-		if ($6[0] == ':')
+		if ($4[0] == '$')
 		{
-			free($6);
-			$6 = mm_strdup("$0");
+			free($4);
+			$4 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_overheadopt_from_incursor_name addon
+		add_additional_variables($7, false);
+		if ($7[0] == ':')
+		{
+			free($7);
+			$7 = mm_strdup("$0");
 		}
 		if ($2[0] == '$')
 		{
@@ -302,6 +317,11 @@ ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon
 			free($4);
 			$4 = mm_strdup("$0");
 		}
+		if ($5[0] == '$')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon
 ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon
 		add_additional_variables($4, false);
-- 
2.1.0.GIT

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to