I'm sorry to have coded a silly bug.

The previous patch has a bug in realloc size calculation.
And separation of the 'connname patch' was incomplete in regtest.
It is fixed in this patch.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..4de28ef 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
 } remoteConn;
 
+typedef struct storeInfo
+{
+	Tuplestorestate *tuplestore;
+	int nattrs;
+	MemoryContext oldcontext;
+	AttInMetadata *attinmeta;
+	char* valbuf;
+	int valbuflen;
+	char **cstrs;
+	bool error_occurred;
+	bool nummismatch;
+} storeInfo;
+
 /*
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);
 static void validate_pkattnums(Relation rel,
 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
 				   int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+
 
 /* Global */
 static remoteConn *pconn = NULL;
@@ -111,6 +127,9 @@ typedef struct remoteConnHashEnt
 /* initial number of connection hashes */
 #define NUMCONN 16
 
+/* Initial block size for value buffer in storeHandler */
+#define INITBUFLEN 64
+
 /* general utility */
 #define xpfree(var_) \
 	do { \
@@ -503,6 +522,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	int			howmany = 0;
 	bool		fail = true;	/* default to backward compatible */
+	storeInfo   storeinfo;
 
 	DBLINK_INIT;
 
@@ -559,15 +579,51 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
 	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+	/*
 	 * Try to execute the query.  Note that since libpq uses malloc, the
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
-	res = PQexec(conn, buf.data);
+	PG_TRY();
+	{
+		res = PQexec(conn, buf.data);
+	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
+
+		finishStoreInfo(&storeinfo);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* Skip remaining results when storeHandler raises exception. */
+		PQskipResult(conn, TRUE);
+		ReThrowError(edata);
+	}
+	PG_END_TRY();
+
+	finishStoreInfo(&storeinfo);
+
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
 	{
+		/* finishStoreInfo saves the fields referred to below. */
+		if (storeinfo.nummismatch)
+		{
+			/* This is only for backward compatibility */
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
+		}
+
 		dblink_res_error(conname, res, "could not fetch from cursor", fail);
 		return (Datum) 0;
 	}
@@ -579,8 +635,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
 				 errmsg("cursor \"%s\" does not exist", curname)));
 	}
+	PQclear(res);
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
@@ -640,6 +696,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible */
 	bool		freeconn = false;
+	storeInfo   storeinfo;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -660,6 +717,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		{
 			/* text,text,bool */
 			DBLINK_GET_CONN;
+			conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			fail = PG_GETARG_BOOL(2);
 		}
@@ -715,164 +773,234 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	rsinfo->setResult = NULL;
 	rsinfo->setDesc = NULL;
 
-	/* synchronous query, or async result retrieval */
-	if (!is_async)
-		res = PQexec(conn, sql);
-	else
+
+	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec/PQgetResult below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+	PG_TRY();
 	{
-		res = PQgetResult(conn);
-		/* NULL means we're all done with the async results */
-		if (!res)
-			return (Datum) 0;
+		/* synchronous query, or async result retrieval */
+		if (!is_async)
+			res = PQexec(conn, sql);
+		else
+			res = PQgetResult(conn);
 	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
 
-	/* if needed, close the connection to the database and cleanup */
-	if (freeconn)
-		PQfinish(conn);
+		finishStoreInfo(&storeinfo);
+		edata = CopyErrorData();
+		FlushErrorState();
 
-	if (!res ||
-		(PQresultStatus(res) != PGRES_COMMAND_OK &&
-		 PQresultStatus(res) != PGRES_TUPLES_OK))
+		/* Skip remaining results when storeHandler raises exception. */
+		PQskipResult(conn, TRUE);
+		ReThrowError(edata);
+	}
+	PG_END_TRY();
+
+	finishStoreInfo(&storeinfo);
+
+	/* NULL res from async get means we're all done with the results */
+	if (res || !is_async)
 	{
-		dblink_res_error(conname, res, "could not execute query", fail);
-		return (Datum) 0;
+		if (freeconn)
+			PQfinish(conn);
+
+		/*
+		 * exclude mismatch of the numbers of the colums here so as to
+		 * behave as before.
+		 */
+		if (!res ||
+			(PQresultStatus(res) != PGRES_COMMAND_OK &&
+			 PQresultStatus(res) != PGRES_TUPLES_OK &&
+			 !storeinfo.nummismatch))
+		{
+			dblink_res_error(conname, res, "could not execute query", fail);
+			return (Datum) 0;
+		}
+
+		/* Set command return status when the query was a command. */
+		if (PQresultStatus(res) == PGRES_COMMAND_OK)
+		{
+			char *values[1];
+			HeapTuple tuple;
+			AttInMetadata *attinmeta;
+			ReturnSetInfo *rcinfo = (ReturnSetInfo*)fcinfo->resultinfo;
+			
+			values[0] = PQcmdStatus(res);
+			attinmeta = TupleDescGetAttInMetadata(rcinfo->setDesc);
+			tuple = BuildTupleFromCStrings(attinmeta, values);
+			tuplestore_puttuple(rcinfo->setResult, tuple);
+		}
+		else if (get_call_result_type(fcinfo, NULL, NULL) == TYPEFUNC_RECORD)
+		{
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+		}
+
+		/* finishStoreInfo saves the fields referred to below. */
+		if (storeinfo.nummismatch)
+		{
+			/* This is only for backward compatibility */
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
+		}
 	}
 
-	materializeResult(fcinfo, res);
+	if (res)
+		PQclear(res);
+
 	return (Datum) 0;
 }
 
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
 static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
 {
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
 
-	Assert(rsinfo->returnMode == SFRM_Materialize);
-
-	PG_TRY();
+	sinfo->oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
+		
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
 	{
-		TupleDesc	tupdesc;
-		bool		is_sql_cmd = false;
-		int			ntuples;
-		int			nfields;
-
-		if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			is_sql_cmd = true;
-
-			/*
-			 * need a tuple descriptor representing one TEXT column to return
-			 * the command status string as our result tuple
-			 */
+		case TYPEFUNC_COMPOSITE:
+			tupdesc = CreateTupleDescCopy(tupdesc);
+			sinfo->nattrs = tupdesc->natts;
+			break;
+		case TYPEFUNC_RECORD:
 			tupdesc = CreateTemplateTupleDesc(1, false);
 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
 							   TEXTOID, -1, 0);
-			ntuples = 1;
-			nfields = 1;
-		}
-		else
-		{
-			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+			sinfo->nattrs = 1;
+			break;
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
+	}
 
-			is_sql_cmd = false;
+	/* make sure we have a persistent copy of the tupdesc */
 
-			/* get a tuple descriptor for our result type */
-			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-			{
-				case TYPEFUNC_COMPOSITE:
-					/* success */
-					break;
-				case TYPEFUNC_RECORD:
-					/* failed to determine actual type of RECORD */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						errmsg("function returning record called in context "
-							   "that cannot accept type record")));
-					break;
-				default:
-					/* result type isn't composite */
-					elog(ERROR, "return type must be a row type");
-					break;
-			}
+	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	sinfo->error_occurred = FALSE;
+	sinfo->nummismatch = FALSE;
+	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+	sinfo->valbuflen = INITBUFLEN;
+	sinfo->valbuf = (char *)palloc(sinfo->valbuflen);
+	sinfo->cstrs = (char **)palloc(sinfo->nattrs * sizeof(char *));
 
-			/* make sure we have a persistent copy of the tupdesc */
-			tupdesc = CreateTupleDescCopy(tupdesc);
-			ntuples = PQntuples(res);
-			nfields = PQnfields(res);
-		}
+	rsinfo->setResult = sinfo->tuplestore;
+	rsinfo->setDesc = tupdesc;
+}
 
-		/*
-		 * check result and tuple descriptor have the same number of columns
-		 */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+	if (sinfo->valbuf)
+	{
+		pfree(sinfo->valbuf);
+		sinfo->valbuf = NULL;
+	}
 
-		if (ntuples > 0)
-		{
-			AttInMetadata *attinmeta;
-			Tuplestorestate *tupstore;
-			MemoryContext oldcontext;
-			int			row;
-			char	  **values;
-
-			attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-			oldcontext = MemoryContextSwitchTo(
-									rsinfo->econtext->ecxt_per_query_memory);
-			tupstore = tuplestore_begin_heap(true, false, work_mem);
-			rsinfo->setResult = tupstore;
-			rsinfo->setDesc = tupdesc;
-			MemoryContextSwitchTo(oldcontext);
+	if (sinfo->cstrs)
+	{
+		pfree(sinfo->cstrs);
+		sinfo->cstrs = NULL;
+	}
 
-			values = (char **) palloc(nfields * sizeof(char *));
+	MemoryContextSwitchTo(sinfo->oldcontext);
+}
 
-			/* put all tuples into the tuplestore */
-			for (row = 0; row < ntuples; row++)
-			{
-				HeapTuple	tuple;
+/* Prototype of this function is PQrowProcessor */
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+	storeInfo *sinfo = (storeInfo *)param;
+	HeapTuple  tuple;
+	int        newbuflen;
+	int        fields = PQnfields(res);
+	int        i;
+	char       **cstrs = sinfo->cstrs;
+	char       *pbuf;
+
+	if (sinfo->error_occurred)
+		return -1;
+
+	if (sinfo->nattrs != fields)
+	{
+		sinfo->error_occurred = TRUE;
+		sinfo->nummismatch = TRUE;
+		finishStoreInfo(sinfo);
 
-				if (!is_sql_cmd)
-				{
-					int			i;
+		/* This error will be processed in dblink_record_internal() */
+		return -1;
+	}
 
-					for (i = 0; i < nfields; i++)
-					{
-						if (PQgetisnull(res, row, i))
-							values[i] = NULL;
-						else
-							values[i] = PQgetvalue(res, row, i);
-					}
-				}
-				else
-				{
-					values[0] = PQcmdStatus(res);
-				}
+	/*
+	 * value input functions assumes that the input string is
+	 * terminated by zero. We should make the values to be so.
+	 */
 
-				/* build the tuple and put it into the tuplestore. */
-				tuple = BuildTupleFromCStrings(attinmeta, values);
-				tuplestore_puttuple(tupstore, tuple);
-			}
+	/*
+     * The length of the buffer for each field is value length + 1 for
+     * zero-termination
+     */
+	newbuflen = fields;
+	for(i = 0 ; i < fields ; i++)
+		newbuflen += columns[i].len;
+
+	if (newbuflen > sinfo->valbuflen)
+	{
+		int tmplen = sinfo->valbuflen * 2;
+		/*
+		 * Try to (re)allocate in bigger steps to avoid flood of allocations
+		 * on weird data.
+		 */
+		while (newbuflen > tmplen && tmplen >= 0)
+			tmplen *= 2;
 
-			/* clean up and return the tuplestore */
-			tuplestore_donestoring(tupstore);
-		}
+		/* Check if the integer was wrap-rounded. */
+		if (tmplen < 0)
+			elog(ERROR, "Buffer size for one row exceeds integer limit");
 
-		PQclear(res);
+		sinfo->valbuf = (char *)repalloc(sinfo->valbuf, tmplen);
+		sinfo->valbuflen = tmplen;
 	}
-	PG_CATCH();
+
+	pbuf = sinfo->valbuf;
+	for(i = 0 ; i < fields ; i++)
 	{
-		/* be sure to release the libpq result */
-		PQclear(res);
-		PG_RE_THROW();
+		int len = columns[i].len;
+		if (len < 0)
+			cstrs[i] = NULL;
+		else
+		{
+			cstrs[i] = pbuf;
+			memcpy(pbuf, columns[i].value, len);
+			pbuf += len;
+			*pbuf++ = '\0';
+		}
 	}
-	PG_END_TRY();
+
+	/*
+	 * These functions may throw exception. It will be caught in
+	 * dblink_record_internal()
+	 */
+	tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+	tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+	return 1;
 }
 
 /*
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 4de28ef..05d7e98 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -733,6 +733,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			else
 			{
 				DBLINK_GET_CONN;
+				conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			}
 		}
@@ -763,6 +764,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		else
 			/* shouldn't happen */
 			elog(ERROR, "wrong number of arguments");
+
+		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 	}
 
 	if (!conn)
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 511dd5e..2dcba15 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -371,7 +371,7 @@ SELECT *
 FROM dblink('myconn','SELECT * FROM foobar',false) AS t(a int, b text, c text[])
 WHERE t.a > 7;
 NOTICE:  relation "foobar" does not exist
-CONTEXT:  Error occurred on dblink connection named "unnamed": could not execute query.
+CONTEXT:  Error occurred on dblink connection named "myconn": could not execute query.
  a | b | c 
 ---+---+---
 (0 rows)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to