Ouch! I'm sorry for making a reverse patch for the first modification.

This is an amendment of the message below. The body text is
copied into this message.

http://archives.postgresql.org/message-id/20111201.192419.103527179.horiguchi.kyot...@oss.ntt.co.jp

=======
Hello, This is the next version of Allow substitute allocators
for PGresult.

Totally chaning the concept from the previous one, this patch
allows libpq to handle alternative tuple store for received
tuples.

Design guidelines are shown below.

 - No need to modify existing client code of libpq.

 - Existing libpq client runs with roughly same performance, and
   dblink with modification runs faster to some extent and
   requires less memory.

I have measured roughly of run time and memory requirement for
three configurations on CentOS6 on Vbox with 2GB mem 4 cores
running on Win7-Corei7, transferring (30 bytes * 2 cols) *
2000000 tuples (120MB net) within this virutal machine. The
results are below.

                                  xfer time     Peak RSS
Original                        : 6.02s         850MB
libpq patch + Original dblink   : 6.11s         850MB
full patch                      : 4.44s         643MB

xfer time here is the mean of five 'real time's measured by
running sql script like this after warmup run.

=== test.sql
select dblink_connect('c', 'host=localhost port=5432 dbname=test');
select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b 
bytea) limit 1;

select dblink_disconnect('c');
===
$  for i in $(seq 1 10); do time psql test -f t.sql; done 
===

Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps.


It seems somewhat slow using patched libpq and original dblink,
but it seems within error range too. If this amount of slowdown
is not permissible, it might be improved by restoring the static
call route before for extra redundancy of the code.

On the other hand, full patch version seems obviously fast and
requires less memory. Isn't it nice?

This patch consists of two sub patches.

The first is a patch for libpq to allow rewiring tuple storage
mechanism. But default behavior is not changed. Existing libpq
client should run with it.

The second is modify dblink to storing received tuples into
tuplestore directly using the mechanism above.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..a360d78 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,7 @@ PQconnectStartParams      157
 PQping                    158
 PQpingParams              159
 PQlibVersion              160
+PQregisterTupleAdder	  161
+PQgetAsCstring		  162
+PQgetAddTupleParam	  163
+PQsetAddTupleErrMes	  164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 50f3f83..437be26 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)
 	conn->allow_ssl_try = true;
 	conn->wait_ssl_try = false;
 #endif
+	conn->addTupleFunc = NULL;
 
 	/*
 	 * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5064,3 +5065,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)
 
 	return prev;
 }
+
+void
+PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param)
+{
+	conn->addTupleFunc = func;
+	conn->addTupleFuncParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 113aab0..c8ec9bd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -48,7 +48,6 @@ char	   *const pgresStatus[] = {
 static int	static_client_encoding = PG_SQL_ASCII;
 static bool static_std_strings = false;
 
-
 static PGEvent *dupEvents(PGEvent *events, int count);
 static bool PQsendQueryStart(PGconn *conn);
 static int PQsendQueryGuts(PGconn *conn,
@@ -66,7 +65,9 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
 			   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
-
+static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func,
+								   int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);
 
 /* ----------------
  * Space management for PGresult.
@@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
 	result->curBlock = NULL;
 	result->curOffset = 0;
 	result->spaceLeft = 0;
+	result->addTupleFunc = pqDefaultAddTupleFunc;
+	result->addTupleFuncParam = NULL;
+	result->addTupleFuncErrMes = NULL;
 
 	if (conn)
 	{
@@ -194,6 +198,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
 			}
 			result->nEvents = conn->nEvents;
 		}
+
+		if (conn->addTupleFunc)
+		{
+			result->addTupleFunc = conn->addTupleFunc;
+			result->addTupleFuncParam = conn->addTupleFuncParam;
+		}
 	}
 	else
 	{
@@ -487,6 +497,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)
 	return pqResultAlloc(res, nBytes, TRUE);
 }
 
+void *
+pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len)
+{
+	void *p;
+
+	switch (func)
+	{
+		case ADDTUP_ALLOC_TEXT:
+			return pqResultAlloc(res, len, TRUE);
+
+		case ADDTUP_ALLOC_BINARY:
+			p = pqResultAlloc(res, len, FALSE);
+
+			if (id == -1)
+				res->addTupleFuncParam = p;
+
+			return p;
+
+		case ADDTUP_ADD_TUPLE:
+			return pqAddTuple(res, res->addTupleFuncParam);
+
+		default:
+			/* Ignore */
+			break;
+	}
+	return NULL;
+}
 /*
  * pqResultAlloc -
  *		Allocate subsidiary storage for a PGresult.
@@ -830,9 +867,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
 /*
  * pqAddTuple
  *	  add a row pointer to the PGresult structure, growing it if necessary
- *	  Returns TRUE if OK, FALSE if not enough memory to add the row
+ *	  Returns tup if OK, NULL if not enough memory to add the row.
  */
-int
+static void *
 pqAddTuple(PGresult *res, PGresAttValue *tup)
 {
 	if (res->ntups >= res->tupArrSize)
@@ -858,13 +895,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)
 			newTuples = (PGresAttValue **)
 				realloc(res->tuples, newSize * sizeof(PGresAttValue *));
 		if (!newTuples)
-			return FALSE;		/* malloc or realloc failed */
+			return NULL;		/* malloc or realloc failed */
 		res->tupArrSize = newSize;
 		res->tuples = newTuples;
 	}
 	res->tuples[res->ntups] = tup;
 	res->ntups++;
-	return TRUE;
+	return tup;
 }
 
 /*
@@ -2822,6 +2859,43 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
 		return 0;
 }
 
+/* PQgetAsCString
+ *	returns the field as C string.
+ */
+char *
+PQgetAsCstring(PGresAttValue *attval)
+{
+	return attval->len == NULL_LEN ? NULL : attval->value;
+}
+
+/* PQgetAddTupleParam
+ *	Get the pointer to the contextual parameter from PGresult which is
+ *	registered to PGconn by PQregisterTupleAdder
+ */
+void *
+PQgetAddTupleParam(const PGresult *res)
+{
+	if (!res)
+		return NULL;
+	return res->addTupleFuncParam;
+}
+
+/* PQsetAddTupleErrMes
+ *	Set the error message pass back to the caller of addTupleFunc
+ *  mes must be a malloc'ed memory block and it is released by the
+ *  caller of addTupleFunc if set.
+ *  You can replace the previous message by alternative mes, or clear
+ *  it with NULL.
+ */
+void
+PQsetAddTupleErrMes(PGresult *res, char *mes)
+{
+	/* Free existing message */
+	if (res->addTupleFuncErrMes)
+		free(res->addTupleFuncErrMes);
+	res->addTupleFuncErrMes = mes;
+}
+
 /* PQnparams:
  *	returns the number of input parameters of a prepared statement.
  */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 77c4d5a..c7f74ae 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)
 	if (conn->curTuple == NULL)
 	{
 		conn->curTuple = (PGresAttValue *)
-			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+			result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
+								 nfields * sizeof(PGresAttValue));
 		if (conn->curTuple == NULL)
-			goto outOfMemory;
+			goto addTupleError;
 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
 
 		/*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)
 	{
 		bitmap = (char *) malloc(nbytes);
 		if (!bitmap)
-			goto outOfMemory;
+			goto addTupleError;
 	}
 
 	if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)
 				vlen = 0;
 			if (tup[i].value == NULL)
 			{
-				tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+				AddTupFunc func =
+					(binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
+				tup[i].value =
+					(char *) result->addTupleFunc(result, func, i, vlen + 1);
 				if (tup[i].value == NULL)
-					goto outOfMemory;
+					goto addTupleError;
 			}
 			tup[i].len = vlen;
 			/* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)
 	}
 
 	/* Success!  Store the completed tuple in the result */
-	if (!pqAddTuple(result, tup))
-		goto outOfMemory;
+	if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
+		goto addTupleError;
+
 	/* and reset for a new message */
 	conn->curTuple = NULL;
 
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)
 		free(bitmap);
 	return 0;
 
-outOfMemory:
+addTupleError:
 	/* Replace partially constructed result with an error result */
 
 	/*
@@ -829,8 +834,21 @@ outOfMemory:
 	 * there's not enough memory to concatenate messages...
 	 */
 	pqClearAsyncResult(conn);
-	printfPQExpBuffer(&conn->errorMessage,
-					  libpq_gettext("out of memory for query result\n"));
+	resetPQExpBuffer(&conn->errorMessage);
+
+	/*
+	 * If error message is passed from addTupleFunc, set it into
+	 * PGconn, assume out of memory if not.
+	 */
+	appendPQExpBufferStr(&conn->errorMessage,
+						 libpq_gettext(result->addTupleFuncErrMes ?
+									   result->addTupleFuncErrMes :
+									   "out of memory for query result\n"));
+	if (result->addTupleFuncErrMes)
+	{
+		free(result->addTupleFuncErrMes);
+		result->addTupleFuncErrMes = NULL;
+	}
 
 	/*
 	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 45a84d8..d14b57a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)
 	if (conn->curTuple == NULL)
 	{
 		conn->curTuple = (PGresAttValue *)
-			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+			result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
+								 nfields * sizeof(PGresAttValue));
 		if (conn->curTuple == NULL)
-			goto outOfMemory;
+			goto addTupleError;
 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
 	}
 	tup = conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)
 			vlen = 0;
 		if (tup[i].value == NULL)
 		{
-			bool		isbinary = (result->attDescs[i].format != 0);
-
-			tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+			AddTupFunc func = (result->attDescs[i].format != 0 ?
+							   ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
+			tup[i].value =
+				(char *) result->addTupleFunc(result, func, i, vlen + 1);
 			if (tup[i].value == NULL)
-				goto outOfMemory;
+				goto addTupleError;
 		}
 		tup[i].len = vlen;
 		/* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)
 	}
 
 	/* Success!  Store the completed tuple in the result */
-	if (!pqAddTuple(result, tup))
-		goto outOfMemory;
+	if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
+		goto addTupleError;
+
 	/* and reset for a new message */
 	conn->curTuple = NULL;
 
 	return 0;
 
-outOfMemory:
+addTupleError:
 
 	/*
 	 * Replace partially constructed result with an error result. First
 	 * discard the old result to try to win back some memory.
 	 */
 	pqClearAsyncResult(conn);
-	printfPQExpBuffer(&conn->errorMessage,
-					  libpq_gettext("out of memory for query result\n"));
+	resetPQExpBuffer(&conn->errorMessage);
+
+	/*
+	 * If error message is passed from addTupleFunc, set it into
+	 * PGconn, assume out of memory if not.
+	 */
+	appendPQExpBufferStr(&conn->errorMessage,
+						 libpq_gettext(result->addTupleFuncErrMes ?
+									   result->addTupleFuncErrMes : 
+									   "out of memory for query result\n"));
+	if (result->addTupleFuncErrMes)
+	{
+		free(result->addTupleFuncErrMes);
+		result->addTupleFuncErrMes = NULL;
+	}
 	pqSaveErrorResult(conn);
 
 	/* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index d13a5b9..bdce294 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/* AddTupFunc is one of the parameters of addTupleFunc that decides
+ * the function of the addTupleFunction. See addTupleFunction for
+ * details */
+typedef enum 
+{
+	ADDTUP_ALLOC_TEXT,          /* Returns non-aligned memory for text value */
+	ADDTUP_ALLOC_BINARY,        /* Returns aligned memory for binary value */
+	ADDTUP_ADD_TUPLE            /* Adds tuple data into tuple storage */
+} AddTupFunc;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -225,6 +235,12 @@ typedef struct pgresAttDesc
 	int			atttypmod;		/* type-specific modifier info */
 } PGresAttDesc;
 
+typedef struct pgresAttValue
+{
+	int			len;			/* length in bytes of the value */
+	char	   *value;			/* actual value, plus terminating zero byte */
+} PGresAttValue;
+
 /* ----------------
  * Exported functions of libpq
  * ----------------
@@ -416,6 +432,52 @@ extern PGPing PQping(const char *conninfo);
 extern PGPing PQpingParams(const char *const * keywords,
 			 const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for tuple storage function.
+ *
+ * This function pointer is used for tuple storage function in
+ * PGresult and PGconn.
+ *
+ * addTupleFunction is called for four types of function designated by
+ * the enum AddTupFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a
+ * binary value which is aligned to the word boundary.
+ *
+ * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and
+ * free the memory blocks allocated by this function if necessary.
+ * id and size are ignored.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out
+ * of memory if the error message is NULL on failure. This function is
+ * assumed not to throw any exception.
+ */
+	typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func,
+									  int id, size_t size);
+
+/*
+ * Register alternative tuple storage function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own tuple
+ * storage and calls it to append rows one by one.
+ *
+ * func is tuple store function. See addTupleFunction.
+ * 
+ * addTupFuncParam is contextual storage that can be get with
+ * PQgetAddTupleParam in func.
+ */
+extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func,
+								 void *addTupFuncParam);
+
 /* Force the write buffer to be written (or at least try) */
 extern int	PQflush(PGconn *conn);
 
@@ -454,6 +516,9 @@ extern char *PQcmdTuples(PGresult *res);
 extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
 extern int	PQgetlength(const PGresult *res, int tup_num, int field_num);
 extern int	PQgetisnull(const PGresult *res, int tup_num, int field_num);
+extern char *PQgetAsCstring(PGresAttValue *attdesc);
+extern void *PQgetAddTupleParam(const PGresult *res);
+extern void	PQsetAddTupleErrMes(PGresult *res, char *mes);
 extern int	PQnparams(const PGresult *res);
 extern Oid	PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 64dfcb2..45e4c93 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc
 
 #define NULL_LEN		(-1)	/* pg_result len for NULL value */
 
-typedef struct pgresAttValue
-{
-	int			len;			/* length in bytes of the value */
-	char	   *value;			/* actual value, plus terminating zero byte */
-} PGresAttValue;
-
 /* Typedef for message-field list entries */
 typedef struct pgMessageField
 {
@@ -209,6 +203,11 @@ struct pg_result
 	PGresult_data *curBlock;	/* most recently allocated block */
 	int			curOffset;		/* start offset of free space in block */
 	int			spaceLeft;		/* number of free bytes remaining in block */
+
+	addTupleFunction addTupleFunc; /* Tuple storage function. See
+									* addTupleFunction for details. */
+	void *addTupleFuncParam;       /* Contextual parameter for addTupleFunc */
+	char *addTupleFuncErrMes;      /* Error message returned from addTupFunc */
 };
 
 /* PGAsyncStatusType defines the state of the query-execution state machine */
@@ -443,6 +442,13 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+    /* Tuple store function. The two fields below is copied to newly
+	 * created PGresult if addTupleFunc is not NULL. Use default
+	 * function if addTupleFunc is NULL. */
+	addTupleFunction addTupleFunc; /* Tuple storage function. See
+									* addTupleFunction for details. */
+	void *addTupleFuncParam;       /* Contextual parameter for addTupFunc */
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -507,7 +513,6 @@ extern void
 pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
 /* This lets gcc check the format string for consistency. */
 __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
-extern int	pqAddTuple(PGresult *res, PGresAttValue *tup);
 extern void pqSaveMessageField(PGresult *res, char code,
 				   const char *value);
 extern void pqSaveParameterStatus(PGconn *conn, const char *name,
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 62c810a..fb2e10e 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
 } remoteConn;
 
+typedef struct storeInfo
+{
+	Tuplestorestate *tuplestore;
+	int nattrs;
+	AttInMetadata *attinmeta;
+	MemoryContext oldcontext;
+	char *attrvalbuf;
+	void **valbuf;
+	size_t *valbufsize;
+	bool error_occurred;
+	bool nummismatch;
+} storeInfo;
+
 /*
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);
 static void validate_pkattnums(Relation rel,
 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
 				   int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size);
+
 
 /* Global */
 static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	int			howmany = 0;
 	bool		fail = true;	/* default to backward compatible */
+	storeInfo   storeinfo;
 
 	DBLINK_INIT;
 
@@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
 	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQregisterTupleAdder(conn, addTuple, &storeinfo);
+
+	/*
 	 * Try to execute the query.  Note that since libpq uses malloc, the
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
 	res = PQexec(conn, buf.data);
+	finishStoreInfo(&storeinfo);
+
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
 	{
+		/* This is only for backward compatibility */
+		if (storeinfo.nummismatch)
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
 		dblink_res_error(conname, res, "could not fetch from cursor", fail);
 		return (Datum) 0;
 	}
@@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 				 errmsg("cursor \"%s\" does not exist", curname)));
 	}
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
@@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible */
 	bool		freeconn = false;
+	storeInfo   storeinfo;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	rsinfo->setResult = NULL;
 	rsinfo->setDesc = NULL;
 
+
+	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec/PQgetResult below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQregisterTupleAdder(conn, addTuple, &storeinfo);
+
 	/* synchronous query, or async result retrieval */
 	if (!is_async)
 		res = PQexec(conn, sql);
 	else
-	{
 		res = PQgetResult(conn);
-		/* NULL means we're all done with the async results */
-		if (!res)
-			return (Datum) 0;
-	}
 
-	/* if needed, close the connection to the database and cleanup */
-	if (freeconn)
-		PQfinish(conn);
+	finishStoreInfo(&storeinfo);
 
-	if (!res ||
-		(PQresultStatus(res) != PGRES_COMMAND_OK &&
-		 PQresultStatus(res) != PGRES_TUPLES_OK))
+	/* NULL res from async get means we're all done with the results */
+	if (res || !is_async)
 	{
-		dblink_res_error(conname, res, "could not execute query", fail);
-		return (Datum) 0;
+		if (freeconn)
+			PQfinish(conn);
+
+		if (!res ||
+			(PQresultStatus(res) != PGRES_COMMAND_OK &&
+			 PQresultStatus(res) != PGRES_TUPLES_OK))
+		{
+			/* This is only for backward compatibility */
+			if (storeinfo.nummismatch)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_DATATYPE_MISMATCH),
+						 errmsg("remote query result rowtype does not match "
+								"the specified FROM clause rowtype")));
+			}
+			dblink_res_error(conname, res, "could not execute query", fail);
+			return (Datum) 0;
+		}
 	}
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
 static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
 {
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
-	Assert(rsinfo->returnMode == SFRM_Materialize);
-
-	PG_TRY();
+	TupleDesc	tupdesc;
+	int i;
+	
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
 	{
-		TupleDesc	tupdesc;
-		bool		is_sql_cmd = false;
-		int			ntuples;
-		int			nfields;
+		case TYPEFUNC_COMPOSITE:
+			/* success */
+			break;
+		case TYPEFUNC_RECORD:
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+			break;
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
+	}
+	
+	sinfo->oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
+
+	/* make sure we have a persistent copy of the tupdesc */
+	tupdesc = CreateTupleDescCopy(tupdesc);
+
+	sinfo->error_occurred = FALSE;
+	sinfo->nummismatch = FALSE;
+	sinfo->nattrs = tupdesc->natts;
+	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+	sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+	for (i = 0 ; i < sinfo->nattrs ; i++)
+	{
+		sinfo->valbuf[i] = NULL;
+		sinfo->valbufsize[i] = 0;
+	}
 
-		if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			is_sql_cmd = true;
-
-			/*
-			 * need a tuple descriptor representing one TEXT column to return
-			 * the command status string as our result tuple
-			 */
-			tupdesc = CreateTemplateTupleDesc(1, false);
-			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-							   TEXTOID, -1, 0);
-			ntuples = 1;
-			nfields = 1;
-		}
-		else
-		{
-			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+	/* Preallocate memory of same size with PGresAttDesc array for values. */
+	sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
 
-			is_sql_cmd = false;
+	rsinfo->setResult = sinfo->tuplestore;
+	rsinfo->setDesc = tupdesc;
+}
 
-			/* get a tuple descriptor for our result type */
-			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-			{
-				case TYPEFUNC_COMPOSITE:
-					/* success */
-					break;
-				case TYPEFUNC_RECORD:
-					/* failed to determine actual type of RECORD */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						errmsg("function returning record called in context "
-							   "that cannot accept type record")));
-					break;
-				default:
-					/* result type isn't composite */
-					elog(ERROR, "return type must be a row type");
-					break;
-			}
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+	int i;
 
-			/* make sure we have a persistent copy of the tupdesc */
-			tupdesc = CreateTupleDescCopy(tupdesc);
-			ntuples = PQntuples(res);
-			nfields = PQnfields(res);
+	for (i = 0 ; i < sinfo->nattrs ; i++)
+	{
+		if (sinfo->valbuf[i])
+		{
+			free(sinfo->valbuf[i]);
+			sinfo->valbuf[i] = NULL;
 		}
+	}
+	if (sinfo->attrvalbuf)
+		free(sinfo->attrvalbuf);
+	sinfo->attrvalbuf = NULL;
+	MemoryContextSwitchTo(sinfo->oldcontext);
+}
 
-		/*
-		 * check result and tuple descriptor have the same number of columns
-		 */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
+static void *
+addTuple(PGresult *res, AddTupFunc  func, int id, size_t size)
+{
+	storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res);
+	HeapTuple	tuple;
+	int fields = PQnfields(res);
+	int i;
+	PGresAttValue *attval;
+	char        **cstrs;
 
-		if (ntuples > 0)
-		{
-			AttInMetadata *attinmeta;
-			Tuplestorestate *tupstore;
-			MemoryContext oldcontext;
-			int			row;
-			char	  **values;
-
-			attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-			oldcontext = MemoryContextSwitchTo(
-									rsinfo->econtext->ecxt_per_query_memory);
-			tupstore = tuplestore_begin_heap(true, false, work_mem);
-			rsinfo->setResult = tupstore;
-			rsinfo->setDesc = tupdesc;
-			MemoryContextSwitchTo(oldcontext);
+	if (sinfo->error_occurred)
+		return NULL;
 
-			values = (char **) palloc(nfields * sizeof(char *));
+	switch (func)
+	{
+		case ADDTUP_ALLOC_TEXT:
+		case ADDTUP_ALLOC_BINARY:
+			if (id == -1)
+				return sinfo->attrvalbuf;
+
+			if (id < 0 || id >= sinfo->nattrs)
+				return NULL;
 
-			/* put all tuples into the tuplestore */
-			for (row = 0; row < ntuples; row++)
+			if (sinfo->valbufsize[id] < size)
 			{
-				HeapTuple	tuple;
+				if (sinfo->valbuf[id] == NULL)
+					sinfo->valbuf[id] = malloc(size);
+				else
+					sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+				sinfo->valbufsize[id] = size;
+			}
+			return sinfo->valbuf[id];
 
-				if (!is_sql_cmd)
-				{
-					int			i;
+		case ADDTUP_ADD_TUPLE:
+			break;   /* Go through */
+		default:
+			/* Ignore */
+			break;
+	}
 
-					for (i = 0; i < nfields; i++)
-					{
-						if (PQgetisnull(res, row, i))
-							values[i] = NULL;
-						else
-							values[i] = PQgetvalue(res, row, i);
-					}
-				}
-				else
-				{
-					values[0] = PQcmdStatus(res);
-				}
+	if (sinfo->nattrs != fields)
+	{
+		sinfo->error_occurred = TRUE;
+		sinfo->nummismatch = TRUE;
+		finishStoreInfo(sinfo);
 
-				/* build the tuple and put it into the tuplestore. */
-				tuple = BuildTupleFromCStrings(attinmeta, values);
-				tuplestore_puttuple(tupstore, tuple);
-			}
+		PQsetAddTupleErrMes(res,
+							strdup("function returning record called in "
+								   "context that cannot accept type record"));
+		return NULL;
+	}
 
-			/* clean up and return the tuplestore */
-			tuplestore_donestoring(tupstore);
-		}
+	/*
+	 * Rewrite PGresAttDesc[] to char(*)[] in-place.
+	 */
+	Assert(sizeof(char*) <= sizeof(PGresAttValue));
+	attval = (PGresAttValue *)sinfo->attrvalbuf;
+	cstrs   = (char **)sinfo->attrvalbuf;
+	for(i = 0 ; i < fields ; i++)
+		cstrs[i] = PQgetAsCstring(attval++);
 
-		PQclear(res);
+	PG_TRY();
+	{
+		tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+		tuplestore_puttuple(sinfo->tuplestore, tuple);
 	}
 	PG_CATCH();
 	{
-		/* be sure to release the libpq result */
-		PQclear(res);
-		PG_RE_THROW();
+		/*
+		 * Return the error message in the exception to the caller and
+		 * cancel the exception.
+		 */
+		ErrorData *edata;
+
+		sinfo->error_occurred = TRUE;
+		sinfo->nummismatch = TRUE;
+
+		finishStoreInfo(sinfo);
+
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		PQsetAddTupleErrMes(res, strdup(edata->message));
+		return NULL;
 	}
 	PG_END_TRY();
+
+	return sinfo->attrvalbuf;
 }
 
 /*
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to