Tom Lane wrote:

> I agree that it seems like a good idea to try.
> There will be more per-row overhead, but the increase in flexibility
> is likely to justify that.

Here's a POC patch implementing row-by-row fetching.

If it wasn't for the per-row overhead, we could probably get rid of
ExecQueryUsingCursor() and use row-by-row fetches whenever
FETCH_COUNT is set, independently of the form of the query.

However the difference in processing time seems to be substantial: on
some quick tests with FETCH_COUNT=10000, I'm seeing almost a 1.5x
increase on large datasets. I assume it's the cost of more allocations.
I would have hoped that avoiding the FETCH queries and associated
round-trips with the cursor method would compensate for that, but it
doesn't appear to be the case, at least with a fast local connection.

So in this patch, psql still uses the cursor method if the
query starts with "select", and falls back to the row-by-row in
the main code (ExecQueryAndProcessResults) otherwise.
Anyway it solves the main issue of the over-consumption of memory
for CTE and update/insert queries returning large resultsets.


Best regards,

-- 
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 00627830c4..d3de9d8336 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -372,6 +372,7 @@ AcceptResult(const PGresult *result, bool show_error)
 		{
 			case PGRES_COMMAND_OK:
 			case PGRES_TUPLES_OK:
+			case PGRES_SINGLE_TUPLE:
 			case PGRES_EMPTY_QUERY:
 			case PGRES_COPY_IN:
 			case PGRES_COPY_OUT:
@@ -675,13 +676,13 @@ PrintNotifications(void)
  * Returns true if successful, false otherwise.
  */
 static bool
-PrintQueryTuples(const PGresult *result, const printQueryOpt *opt,
+PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt,
 				 FILE *printQueryFout)
 {
 	bool		ok = true;
 	FILE	   *fout = printQueryFout ? printQueryFout : pset.queryFout;
 
-	printQuery(result, opt ? opt : &pset.popt, fout, false, pset.logfile);
+	printQueryChunks(result, nresults, opt ? opt : &pset.popt, fout, false, pset.logfile);
 	fflush(fout);
 	if (ferror(fout))
 	{
@@ -958,7 +959,7 @@ PrintQueryResult(PGresult *result, bool last,
 			else if (last && pset.crosstab_flag)
 				success = PrintResultInCrosstab(result);
 			else if (last || pset.show_all_results)
-				success = PrintQueryTuples(result, opt, printQueryFout);
+				success = PrintQueryTuples((const PGresult**)&result, 1, opt, printQueryFout);
 			else
 				success = true;
 
@@ -1369,6 +1370,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
 	return OK;
 }
 
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+	ExecStatusType status = PQresultStatus(result);
+	if (pset.gfname != NULL &&			/* there is a \g file or program */
+		*gfile_fout == NULL &&           /* and it's not already opened */
+		(status == PGRES_TUPLES_OK ||
+		 status == PGRES_SINGLE_TUPLE ||
+		 status == PGRES_COPY_OUT))
+	{
+		if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+		{
+			if (is_pipe)
+				disable_sigpipe_trap();
+		}
+		else
+			return false;
+	}
+	return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+	/* close \g file if we opened it */
+	if (gfile_fout)
+	{
+		if (is_pipe)
+		{
+			pclose(gfile_fout);
+			restore_sigpipe_trap();
+		}
+		else
+			fclose(gfile_fout);
+	}
+}
 
 /*
  * ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1400,10 +1442,16 @@ ExecQueryAndProcessResults(const char *query,
 	bool		success;
 	instr_time	before,
 				after;
+	int fetch_count = pset.fetch_count;
 	PGresult   *result;
+
 	FILE	   *gfile_fout = NULL;
 	bool		gfile_is_pipe = false;
 
+	PGresult   **result_array = NULL; /* to collect results in single row mode */
+	int64		total_tuples = 0;
+	int			ntuples;
+
 	if (timing)
 		INSTR_TIME_SET_CURRENT(before);
 
@@ -1424,6 +1472,33 @@ ExecQueryAndProcessResults(const char *query,
 		return -1;
 	}
 
+	/*
+	 * If FETCH_COUNT is set and the context allows it, use the single row
+	 * mode to fetch results and have no more than FETCH_COUNT rows in
+	 * memory.
+	 */
+	if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+		&& !pset.gset_prefix && pset.show_all_results)
+	{
+		/*
+		 * The row-by-row fetch is not enabled when SHOW_ALL_RESULTS is false,
+		 * since we would need to accumulate all rows before knowing
+		 * whether they need to be discarded or displayed, which contradicts
+		 * FETCH_COUNT.
+		 */
+		if (!PQsetSingleRowMode(pset.db))
+		{
+			pg_log_warning("fetching results in single row mode is unavailable");
+			fetch_count = 0;
+		}
+		else
+		{
+			result_array = (PGresult**) pg_malloc(fetch_count * sizeof(PGresult*));
+		}
+	}
+	else
+		fetch_count = 0;		/* disable single-row mode */
+
 	/*
 	 * If SIGINT is sent while the query is processing, the interrupt will be
 	 * consumed.  The user's intention, though, is to cancel the entire watch
@@ -1443,6 +1518,7 @@ ExecQueryAndProcessResults(const char *query,
 		ExecStatusType result_status;
 		PGresult   *next_result;
 		bool		last;
+		bool		partial_display = false;
 
 		if (!AcceptResult(result, false))
 		{
@@ -1569,6 +1645,85 @@ ExecQueryAndProcessResults(const char *query,
 			success &= HandleCopyResult(&result, copy_stream);
 		}
 
+		if (fetch_count > 0 && result_status == PGRES_SINGLE_TUPLE)
+		{
+			FILE	   *tuples_fout = printQueryFout;
+			printQueryOpt my_popt = pset.popt;
+
+			ntuples = 0;
+			partial_display = true;
+
+			success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+			if (gfile_fout)
+				tuples_fout = gfile_fout;
+
+			/* initialize print options for partial table output */
+			my_popt.topt.start_table = true;
+			my_popt.topt.stop_table = false;
+			my_popt.topt.prior_records = 0;
+
+			while (success)
+			{
+				result_array[ntuples++] = result;
+				if (ntuples == fetch_count)
+				{
+					/* TODO: handle paging */
+					/* display the current chunk of results */
+					PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+					/* clear and reuse result_array */
+					for (int i=0; i < ntuples; i++)
+						PQclear(result_array[i]);
+					/* after the first result set, disallow header decoration */
+					my_popt.topt.start_table = false;
+					my_popt.topt.prior_records += ntuples;
+					total_tuples += ntuples;
+					ntuples = 0;
+				}
+
+				result = PQgetResult(pset.db);
+				if (result == NULL)
+				{
+					/*
+					 * Error. We expect a PGRES_TUPLES_OK result with
+					 * zero tuple in it to finish the row-by-row sequence.
+					 */
+					success = false;
+					break;
+				}
+
+				if (PQresultStatus(result) == PGRES_TUPLES_OK)
+				{
+					/* TODO: merge this block with the code above? */
+					/*
+					 * The last row has been read. Display the last chunk of
+					 * results and the footer.
+					 */
+					my_popt.topt.stop_table = true;
+					PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+					for (int i=0; i < ntuples; i++)
+						PQclear(result_array[i]);
+					total_tuples += ntuples;
+					ntuples = 0;
+
+					result = NULL;
+					{
+						/*
+						 * fake SetResultVariables() as in ExecQueryUsingCursor().
+						 */
+						char		buf[32];
+
+						SetVariable(pset.vars, "ERROR", "false");
+						SetVariable(pset.vars, "SQLSTATE", "00000");
+						snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+						SetVariable(pset.vars, "ROW_COUNT", buf);
+					}
+					break;
+				}
+			}
+		}
+		else
+			partial_display = false;
+
 		/*
 		 * Check PQgetResult() again.  In the typical case of a single-command
 		 * string, it will return NULL.  Otherwise, we'll have other results
@@ -1597,7 +1752,7 @@ ExecQueryAndProcessResults(const char *query,
 		}
 
 		/* this may or may not print something depending on settings */
-		if (result != NULL)
+		if (result != NULL && !partial_display)
 		{
 			/*
 			 * If results need to be printed into the file specified by \g,
@@ -1606,25 +1761,10 @@ ExecQueryAndProcessResults(const char *query,
 			 * tuple output, but it's still used for status output.
 			 */
 			FILE	   *tuples_fout = printQueryFout;
-			bool		do_print = true;
-
-			if (PQresultStatus(result) == PGRES_TUPLES_OK &&
-				pset.gfname)
-			{
-				if (gfile_fout == NULL)
-				{
-					if (openQueryOutputFile(pset.gfname,
-											&gfile_fout, &gfile_is_pipe))
-					{
-						if (gfile_is_pipe)
-							disable_sigpipe_trap();
-					}
-					else
-						success = do_print = false;
-				}
+			success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+			if (gfile_fout)
 				tuples_fout = gfile_fout;
-			}
-			if (do_print)
+			if (success)
 				success &= PrintQueryResult(result, last, opt,
 											tuples_fout, printQueryFout);
 		}
@@ -1643,17 +1783,10 @@ ExecQueryAndProcessResults(const char *query,
 		}
 	}
 
-	/* close \g file if we opened it */
-	if (gfile_fout)
-	{
-		if (gfile_is_pipe)
-		{
-			pclose(gfile_fout);
-			restore_sigpipe_trap();
-		}
-		else
-			fclose(gfile_fout);
-	}
+	CloseGOutput(gfile_fout, gfile_is_pipe);
+
+	if (result_array)
+		pg_free(result_array);
 
 	/* may need this to recover from conn loss during COPY */
 	if (!CheckConnection())
diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c
index 3396f9b462..d8f0a29773 100644
--- a/src/fe_utils/print.c
+++ b/src/fe_utils/print.c
@@ -3533,17 +3533,42 @@ printTable(const printTableContent *cont,
 void
 printQuery(const PGresult *result, const printQueryOpt *opt,
 		   FILE *fout, bool is_pager, FILE *flog)
+{
+	printQueryChunks(&result, 1, opt, fout, is_pager, flog);
+}
+
+/*
+ * Print the results of a query that may have been obtained by a
+ * succession of calls to PQgetResult in single-row mode.
+ *
+ * results: array of results of a successful query. They must have the same columns.
+ * nbresults: size of results
+ * opt: formatting options
+ * fout: where to print to
+ * is_pager: true if caller has already redirected fout to be a pager pipe
+ * flog: if not null, also print the data there (for --log-file option)
+ */
+void
+printQueryChunks(const PGresult *results[], int nresults, const printQueryOpt *opt,
+				 FILE *fout, bool is_pager, FILE *flog)
 {
 	printTableContent cont;
 	int			i,
 				r,
 				c;
+	int			nrows = 0;		/* total number of rows */
+	int			ri;				/* index into results[] */
 
 	if (cancel_pressed)
 		return;
 
+	for (ri = 0; ri < nresults; ri++)
+	{
+		nrows += PQntuples(results[ri]);
+	}
+
 	printTableInit(&cont, &opt->topt, opt->title,
-				   PQnfields(result), PQntuples(result));
+				   (nresults > 0) ? PQnfields(results[0]) : 0, nrows);
 
 	/* Assert caller supplied enough translate_columns[] entries */
 	Assert(opt->translate_columns == NULL ||
@@ -3551,34 +3576,37 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
 
 	for (i = 0; i < cont.ncolumns; i++)
 	{
-		printTableAddHeader(&cont, PQfname(result, i),
+		printTableAddHeader(&cont, PQfname(results[0], i),
 							opt->translate_header,
-							column_type_alignment(PQftype(result, i)));
+							column_type_alignment(PQftype(results[0], i)));
 	}
 
 	/* set cells */
-	for (r = 0; r < cont.nrows; r++)
+	for (ri = 0; ri < nresults; ri++)
 	{
-		for (c = 0; c < cont.ncolumns; c++)
+		for (r = 0; r < PQntuples(results[ri]); r++)
 		{
-			char	   *cell;
-			bool		mustfree = false;
-			bool		translate;
-
-			if (PQgetisnull(result, r, c))
-				cell = opt->nullPrint ? opt->nullPrint : "";
-			else
+			for (c = 0; c < cont.ncolumns; c++)
 			{
-				cell = PQgetvalue(result, r, c);
-				if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+				char	   *cell;
+				bool		mustfree = false;
+				bool		translate;
+
+				if (PQgetisnull(results[ri], r, c))
+					cell = opt->nullPrint ? opt->nullPrint : "";
+				else
 				{
-					cell = format_numeric_locale(cell);
-					mustfree = true;
+					cell = PQgetvalue(results[ri], r, c);
+					if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+					{
+						cell = format_numeric_locale(cell);
+						mustfree = true;
+					}
 				}
-			}
 
-			translate = (opt->translate_columns && opt->translate_columns[c]);
-			printTableAddCell(&cont, cell, translate, mustfree);
+				translate = (opt->translate_columns && opt->translate_columns[c]);
+				printTableAddCell(&cont, cell, translate, mustfree);
+			}
 		}
 	}
 
diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h
index 54f783c907..3befc41bdc 100644
--- a/src/include/fe_utils/print.h
+++ b/src/include/fe_utils/print.h
@@ -220,7 +220,10 @@ extern void printTableCleanup(printTableContent *const content);
 extern void printTable(const printTableContent *cont,
 					   FILE *fout, bool is_pager, FILE *flog);
 extern void printQuery(const PGresult *result, const printQueryOpt *opt,
-					   FILE *fout, bool is_pager, FILE *flog);
+						  FILE *fout, bool is_pager, FILE *flog);
+extern void printQueryChunks(const PGresult *results[], int nresults,
+							 const printQueryOpt *opt,
+							 FILE *fout, bool is_pager, FILE *flog);
 
 extern char column_type_alignment(Oid);
 

Reply via email to