From a2ae44429a68f9776d99285930b16f46c4c9197d Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Wed, 16 Nov 2022 10:49:45 +0900
Subject: [PATCH v1] Teach crosstab() to use SPI_cursor_* interface

Fixes memory issues for input queries that return many rows.
---
 contrib/tablefunc/tablefunc.c | 368 +++++++++++++++++++---------------
 1 file changed, 204 insertions(+), 164 deletions(-)

diff --git a/contrib/tablefunc/tablefunc.c b/contrib/tablefunc/tablefunc.c
index b967e6d4be..feebe497d7 100644
--- a/contrib/tablefunc/tablefunc.c
+++ b/contrib/tablefunc/tablefunc.c
@@ -356,12 +356,12 @@ crosstab(PG_FUNCTION_ARGS)
 {
 	char	   *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-	Tuplestorestate *tupstore;
+	Tuplestorestate *tupstore = NULL;
 	TupleDesc	tupdesc;
 	uint64		call_cntr;
 	uint64		max_calls;
 	AttInMetadata *attinmeta;
-	SPITupleTable *spi_tuptable;
+	SPITupleTable *spi_tuptable = NULL;
 	TupleDesc	spi_tupdesc;
 	bool		firstpass;
 	char	   *lastrowid;
@@ -371,6 +371,9 @@ crosstab(PG_FUNCTION_ARGS)
 	MemoryContext oldcontext;
 	int			ret;
 	uint64		proc;
+	SPIPlanPtr	plan;
+	Portal		portal;
+	bool		tupstore_initialized = false;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -389,204 +392,244 @@ crosstab(PG_FUNCTION_ARGS)
 		/* internal error */
 		elog(ERROR, "crosstab: SPI_connect returned %d", ret);
 
-	/* Retrieve the desired rows */
-	ret = SPI_execute(sql, true, 0);
-	proc = SPI_processed;
-
-	/* If no qualifying tuples, fall out early */
-	if (ret != SPI_OK_SELECT || proc == 0)
-	{
-		SPI_finish();
-		rsinfo->isDone = ExprEndResult;
-		PG_RETURN_NULL();
-	}
+	/* Create an SPI plan and a cursor for fetching the result rows. */
+	if ((plan = SPI_prepare(sql, 0, NULL)) == NULL)
+		/* internal error */
+		elog(ERROR, "SPI_prepare(\"%s\") failed", sql);
 
-	spi_tuptable = SPI_tuptable;
-	spi_tupdesc = spi_tuptable->tupdesc;
+	if ((portal = SPI_cursor_open(NULL, plan, NULL, NULL, true)) == NULL)
+		/* internal error */
+		elog(ERROR, "SPI_cursor_open(\"%s\") failed", sql);
 
-	/*----------
-	 * The provided SQL query must always return three columns.
-	 *
-	 * 1. rowname
-	 *	the label or identifier for each row in the final result
-	 * 2. category
-	 *	the label or identifier for each column in the final result
-	 * 3. values
-	 *	the value for each column in the final result
-	 *----------
+	/*
+	 * Fetch rows from the cursor in batches to avoid overloading
+	 * SPI_tuptable and store them in the result tuplestore.
 	 */
-	if (spi_tupdesc->natts != 3)
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("invalid source data SQL statement"),
-				 errdetail("The provided SQL must return 3 "
-						   "columns: rowid, category, and values.")));
-
-	/* get a tuple descriptor for our result type */
-	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+#define	CROSSTAB_BATCHSIZE	10000
+	for (;;)
 	{
-		case TYPEFUNC_COMPOSITE:
-			/* success */
-			break;
-		case TYPEFUNC_RECORD:
-			/* failed to determine actual type of RECORD */
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("function returning record called in context "
-							"that cannot accept type record")));
-			break;
-		default:
-			/* result type isn't composite */
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("return type must be a row type")));
-			break;
-	}
+		/* Retrieve the desired rows */
+		SPI_cursor_fetch(portal, true, CROSSTAB_BATCHSIZE);
+		proc = SPI_processed;
 
-	/*
-	 * Check that return tupdesc is compatible with the data we got from SPI,
-	 * at least based on number and type of attributes
-	 */
-	if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("return and sql tuple descriptions are " \
-						"incompatible")));
+		/* If no qualifying tuples, fall out early */
+		if (proc == 0)
+			goto done;
 
-	/*
-	 * switch to long-lived memory context
-	 */
-	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+		spi_tuptable = SPI_tuptable;
+		spi_tupdesc = spi_tuptable->tupdesc;
 
-	/* make sure we have a persistent copy of the result tupdesc */
-	tupdesc = CreateTupleDescCopy(tupdesc);
+		/*
+		 * If not already done, initialize the tuplestore to return the result
+		 * in.
+		 */
+		if (!tupstore_initialized)
+		{
+			/* Sanity checks. */
+
+			/*----------
+			 * The provided SQL query must always return three columns.
+			 *
+			 * 1. rowname
+			 *	the label or identifier for each row in the final result
+			 * 2. category
+			 *	the label or identifier for each column in the final result
+			 * 3. values
+			 *	the value for each column in the final result
+			 *----------
+			 */
+			if (spi_tupdesc->natts != 3)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid source data SQL statement"),
+						 errdetail("The provided SQL must return 3 "
+								   "columns: rowid, category, and values.")));
 
-	/* initialize our tuplestore in long-lived context */
-	tupstore =
-		tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
-							  false, work_mem);
+			/* get a tuple descriptor for our result type */
+			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+			{
+				case TYPEFUNC_COMPOSITE:
+					/* success */
+					break;
+				case TYPEFUNC_RECORD:
+					/* failed to determine actual type of RECORD */
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("function returning record called in context "
+									"that cannot accept type record")));
+					break;
+				default:
+					/* result type isn't composite */
+					ereport(ERROR,
+							(errcode(ERRCODE_DATATYPE_MISMATCH),
+							 errmsg("return type must be a row type")));
+					break;
+			}
 
-	MemoryContextSwitchTo(oldcontext);
+			/*
+			 * Check that return tupdesc is compatible with the data we got
+			 * from SPI, at least based on number and type of attributes
+			 */
+			if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("return and sql tuple descriptions are " \
+								"incompatible")));
+			/*
+			 * switch to long-lived memory context
+			 */
+			oldcontext = MemoryContextSwitchTo(per_query_ctx);
 
-	/*
-	 * Generate attribute metadata needed later to produce tuples from raw C
-	 * strings
-	 */
-	attinmeta = TupleDescGetAttInMetadata(tupdesc);
+			/* make sure we have a persistent copy of the result tupdesc */
+			tupdesc = CreateTupleDescCopy(tupdesc);
 
-	/* total number of tuples to be examined */
-	max_calls = proc;
+			/* initialize our tuplestore in long-lived context */
+			tupstore =
+				tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
+									  false, work_mem);
 
-	/* the return tuple always must have 1 rowid + num_categories columns */
-	num_categories = tupdesc->natts - 1;
+			MemoryContextSwitchTo(oldcontext);
 
-	firstpass = true;
-	lastrowid = NULL;
+			/*
+			 * Generate attribute metadata needed later to produce tuples from
+			 * raw C strings
+			 */
+			attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
-	for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
-	{
-		bool		skip_tuple = false;
-		char	  **values;
+			/* total number of tuples to be examined */
+			max_calls = proc;
 
-		/* allocate and zero space */
-		values = (char **) palloc0((1 + num_categories) * sizeof(char *));
+			/* the return tuple always must have 1 rowid + num_categories columns */
+			num_categories = tupdesc->natts - 1;
 
-		/*
-		 * now loop through the sql results and assign each value in sequence
-		 * to the next category
-		 */
-		for (i = 0; i < num_categories; i++)
-		{
-			HeapTuple	spi_tuple;
-			char	   *rowid;
+			tupstore_initialized = true;
+		}
 
-			/* see if we've gone too far already */
-			if (call_cntr >= max_calls)
-				break;
+		firstpass = true;
+		lastrowid = NULL;
 
-			/* get the next sql result tuple */
-			spi_tuple = spi_tuptable->vals[call_cntr];
+		for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
+		{
+			bool		skip_tuple = false;
+			char	  **values;
 
-			/* get the rowid from the current sql result tuple */
-			rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
+			/* allocate and zero space */
+			values = (char **) palloc0((1 + num_categories) * sizeof(char *));
 
 			/*
-			 * If this is the first pass through the values for this rowid,
-			 * set the first column to rowid
+			 * now loop through the sql results and assign each value in sequence
+			 * to the next category
 			 */
-			if (i == 0)
+			for (i = 0; i < num_categories; i++)
 			{
-				xpstrdup(values[0], rowid);
+				HeapTuple	spi_tuple;
+				char	   *rowid;
+
+				/* see if we've gone too far already */
+				if (call_cntr >= max_calls)
+					break;
+
+				/* get the next sql result tuple */
+				spi_tuple = spi_tuptable->vals[call_cntr];
+
+				/* get the rowid from the current sql result tuple */
+				rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
 
 				/*
-				 * Check to see if the rowid is the same as that of the last
-				 * tuple sent -- if so, skip this tuple entirely
+				 * If this is the first pass through the values for this rowid,
+				 * set the first column to rowid
 				 */
-				if (!firstpass && xstreq(lastrowid, rowid))
+				if (i == 0)
 				{
+					xpstrdup(values[0], rowid);
+
+					/*
+					 * Check to see if the rowid is the same as that of the last
+					 * tuple sent -- if so, skip this tuple entirely
+					 */
+					if (!firstpass && xstreq(lastrowid, rowid))
+					{
+						xpfree(rowid);
+						skip_tuple = true;
+						break;
+					}
+				}
+
+				/*
+				 * If rowid hasn't changed on us, continue building the output
+				 * tuple.
+				 */
+				if (xstreq(rowid, values[0]))
+				{
+					/*
+					 * Get the next category item value, which is always
+					 * attribute number three.
+					 *
+					 * Be careful to assign the value to the array index based
+					 * on which category we are presently processing.
+					 */
+					values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
+
+					/*
+					 * increment the counter since we consume a row for each
+					 * category, but not for last pass because the outer loop will
+					 * do that for us
+					 */
+					if (i < (num_categories - 1))
+						call_cntr++;
+					xpfree(rowid);
+				}
+				else
+				{
+					/*
+					 * We'll fill in NULLs for the missing values, but we need
+					 * to decrement the counter since this sql result row
+					 * doesn't belong to the current output tuple.
+					 */
+					call_cntr--;
 					xpfree(rowid);
-					skip_tuple = true;
 					break;
 				}
 			}
 
-			/*
-			 * If rowid hasn't changed on us, continue building the output
-			 * tuple.
-			 */
-			if (xstreq(rowid, values[0]))
+			if (!skip_tuple)
 			{
-				/*
-				 * Get the next category item value, which is always attribute
-				 * number three.
-				 *
-				 * Be careful to assign the value to the array index based on
-				 * which category we are presently processing.
-				 */
-				values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
+				HeapTuple	tuple;
 
-				/*
-				 * increment the counter since we consume a row for each
-				 * category, but not for last pass because the outer loop will
-				 * do that for us
-				 */
-				if (i < (num_categories - 1))
-					call_cntr++;
-				xpfree(rowid);
+				/* build the tuple and store it */
+				tuple = BuildTupleFromCStrings(attinmeta, values);
+				tuplestore_puttuple(tupstore, tuple);
+				heap_freetuple(tuple);
 			}
-			else
-			{
-				/*
-				 * We'll fill in NULLs for the missing values, but we need to
-				 * decrement the counter since this sql result row doesn't
-				 * belong to the current output tuple.
-				 */
-				call_cntr--;
-				xpfree(rowid);
-				break;
-			}
-		}
-
-		if (!skip_tuple)
-		{
-			HeapTuple	tuple;
 
-			/* build the tuple and store it */
-			tuple = BuildTupleFromCStrings(attinmeta, values);
-			tuplestore_puttuple(tupstore, tuple);
-			heap_freetuple(tuple);
+			/* Remember current rowid */
+			xpfree(lastrowid);
+			xpstrdup(lastrowid, values[0]);
+			firstpass = false;
+
+			/* Clean up */
+			for (i = 0; i < num_categories + 1; i++)
+				if (values[i] != NULL)
+					pfree(values[i]);
+			pfree(values);
 		}
 
-		/* Remember current rowid */
-		xpfree(lastrowid);
-		xpstrdup(lastrowid, values[0]);
-		firstpass = false;
+		/* Free up the memory of this batch's rows. */
+		SPI_freetuptable(spi_tuptable);
+	}
+
+done:
+	/* release SPI related resources (and return to caller's context) */
+	SPI_freetuptable(spi_tuptable);
+	SPI_cursor_close(portal);
+	SPI_freeplan(plan);
+	SPI_finish();
 
-		/* Clean up */
-		for (i = 0; i < num_categories + 1; i++)
-			if (values[i] != NULL)
-				pfree(values[i]);
-		pfree(values);
+	/* If the query returned 0 zeros, let the caller know so. */
+	if (!tupstore_initialized)
+	{
+		rsinfo->isDone = ExprEndResult;
+		PG_RETURN_NULL();
 	}
 
 	/* let the caller know we're sending back a tuplestore */
@@ -594,9 +637,6 @@ crosstab(PG_FUNCTION_ARGS)
 	rsinfo->setResult = tupstore;
 	rsinfo->setDesc = tupdesc;
 
-	/* release SPI related resources (and return to caller's context) */
-	SPI_finish();
-
 	return (Datum) 0;
 }
 
-- 
2.35.3

