I wrote: > Here's a POC patch implementing row-by-row fetching.
PFA an updated patch. Best regards, -- Daniel Vérité https://postgresql.verite.pro/ Twitter: @DanielVerite
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index f907f5d4e8..ad5e8a5de9 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -372,6 +372,7 @@ AcceptResult(const PGresult *result, bool show_error) { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: + case PGRES_SINGLE_TUPLE: case PGRES_EMPTY_QUERY: case PGRES_COPY_IN: case PGRES_COPY_OUT: @@ -675,13 +676,13 @@ PrintNotifications(void) * Returns true if successful, false otherwise. */ static bool -PrintQueryTuples(const PGresult *result, const printQueryOpt *opt, +PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt, FILE *printQueryFout) { bool ok = true; FILE *fout = printQueryFout ? printQueryFout : pset.queryFout; - printQuery(result, opt ? opt : &pset.popt, fout, false, pset.logfile); + printQueryChunks(result, nresults, opt ? opt : &pset.popt, fout, false, pset.logfile); fflush(fout); if (ferror(fout)) { @@ -958,7 +959,7 @@ PrintQueryResult(PGresult *result, bool last, else if (last && pset.crosstab_flag) success = PrintResultInCrosstab(result); else if (last || pset.show_all_results) - success = PrintQueryTuples(result, opt, printQueryFout); + success = PrintQueryTuples((const PGresult**)&result, 1, opt, printQueryFout); else success = true; @@ -1371,6 +1372,47 @@ DescribeQuery(const char *query, double *elapsed_msec) return OK; } +/* + * Check if an output stream for \g needs to be opened, and if + * yes, open it. + * Return false if an error occurred, true otherwise. + */ +static bool +SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe) +{ + ExecStatusType status = PQresultStatus(result); + if (pset.gfname != NULL && /* there is a \g file or program */ + *gfile_fout == NULL && /* and it's not already opened */ + (status == PGRES_TUPLES_OK || + status == PGRES_SINGLE_TUPLE || + status == PGRES_COPY_OUT)) + { + if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe)) + { + if (is_pipe) + disable_sigpipe_trap(); + } + else + return false; + } + return true; +} + +static void +CloseGOutput(FILE *gfile_fout, bool is_pipe) +{ + /* close \g file if we opened it */ + if (gfile_fout) + { + if (is_pipe) + { + pclose(gfile_fout); + restore_sigpipe_trap(); + } + else + fclose(gfile_fout); + } +} /* * ExecQueryAndProcessResults: utility function for use by SendQuery() @@ -1402,10 +1444,16 @@ ExecQueryAndProcessResults(const char *query, bool success; instr_time before, after; + int fetch_count = pset.fetch_count; PGresult *result; + FILE *gfile_fout = NULL; bool gfile_is_pipe = false; + PGresult **result_array = NULL; /* to collect results in single row mode */ + int64 total_tuples = 0; + int ntuples; + if (timing) INSTR_TIME_SET_CURRENT(before); else @@ -1428,6 +1476,33 @@ ExecQueryAndProcessResults(const char *query, return -1; } + /* + * If FETCH_COUNT is set and the context allows it, use the single row + * mode to fetch results and have no more than FETCH_COUNT rows in + * memory. + */ + if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch + && !pset.gset_prefix && pset.show_all_results) + { + /* + * The row-by-row fetch is not enabled when SHOW_ALL_RESULTS is false, + * since we would need to accumulate all rows before knowing + * whether they need to be discarded or displayed, which contradicts + * FETCH_COUNT. + */ + if (!PQsetSingleRowMode(pset.db)) + { + pg_log_warning("fetching results in single row mode is unavailable"); + fetch_count = 0; + } + else + { + result_array = (PGresult**) pg_malloc(fetch_count * sizeof(PGresult*)); + } + } + else + fetch_count = 0; /* disable single-row mode */ + /* * If SIGINT is sent while the query is processing, the interrupt will be * consumed. The user's intention, though, is to cancel the entire watch @@ -1447,6 +1522,7 @@ ExecQueryAndProcessResults(const char *query, ExecStatusType result_status; PGresult *next_result; bool last; + bool partial_display = false; if (!AcceptResult(result, false)) { @@ -1573,6 +1649,96 @@ ExecQueryAndProcessResults(const char *query, success &= HandleCopyResult(&result, copy_stream); } + if (fetch_count > 0 && result_status == PGRES_SINGLE_TUPLE) + { + FILE *tuples_fout = printQueryFout; + printQueryOpt my_popt = pset.popt; + bool is_pager = false; + int flush_error = 0; + + ntuples = 0; + total_tuples = 0; + partial_display = true; + + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (gfile_fout) + tuples_fout = gfile_fout; + + /* initialize print options for partial table output */ + my_popt.topt.start_table = true; + my_popt.topt.stop_table = false; + my_popt.topt.prior_records = 0; + + while (success) + { + result_array[ntuples++] = result; + if (ntuples == fetch_count) + { + /* pager: open at most once per resultset */ + if (tuples_fout == stdout && !is_pager) + { + tuples_fout = PageOutput(INT_MAX, &(my_popt.topt)); + is_pager = true; + } + /* display the current chunk of results unless the output stream is not working */ + if (!flush_error) + { + PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout); + flush_error = fflush(tuples_fout); + } + /* clear and reuse result_array */ + for (int i=0; i < ntuples; i++) + PQclear(result_array[i]); + /* after the first result set, disallow header decoration */ + my_popt.topt.start_table = false; + my_popt.topt.prior_records += ntuples; + total_tuples += ntuples; + ntuples = 0; + } + + result = PQgetResult(pset.db); + if (result == NULL) + { + /* + * Error. We expect a PGRES_TUPLES_OK result with + * zero tuple in it to finish the row-by-row sequence. + */ + success = false; + break; + } + + if (PQresultStatus(result) == PGRES_TUPLES_OK) + { + /* TODO: merge this block with the code above? */ + /* + * The last row has been read. Display the last chunk of + * results and the footer. + */ + my_popt.topt.stop_table = true; + if (!flush_error) + { + PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout); + flush_error = fflush(tuples_fout); + } + for (int i=0; i < ntuples; i++) + PQclear(result_array[i]); + total_tuples += ntuples; + ntuples = 0; + + if (is_pager) + { + ClosePager(tuples_fout); + } + + result = NULL; + /*partial_display_rowcount = total_tuples;*/ + break; + } + } + } + else + partial_display = false; + /* * Check PQgetResult() again. In the typical case of a single-command * string, it will return NULL. Otherwise, we'll have other results @@ -1601,7 +1767,7 @@ ExecQueryAndProcessResults(const char *query, } /* this may or may not print something depending on settings */ - if (result != NULL) + if (result != NULL && !partial_display) { /* * If results need to be printed into the file specified by \g, @@ -1610,33 +1776,32 @@ ExecQueryAndProcessResults(const char *query, * tuple output, but it's still used for status output. */ FILE *tuples_fout = printQueryFout; - bool do_print = true; - - if (PQresultStatus(result) == PGRES_TUPLES_OK && - pset.gfname) - { - if (gfile_fout == NULL) - { - if (openQueryOutputFile(pset.gfname, - &gfile_fout, &gfile_is_pipe)) - { - if (gfile_is_pipe) - disable_sigpipe_trap(); - } - else - success = do_print = false; - } + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (gfile_fout) tuples_fout = gfile_fout; - } - if (do_print) + if (success) success &= PrintQueryResult(result, last, opt, tuples_fout, printQueryFout); } /* set variables on last result if all went well */ if (!is_watch && last && success) + { SetResultVariables(result, true); + if (partial_display) + { + /* + * fake SetResultVariables() as in ExecQueryUsingCursor(). + */ + char buf[32]; + SetVariable(pset.vars, "ERROR", "false"); + SetVariable(pset.vars, "SQLSTATE", "00000"); + snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples); + SetVariable(pset.vars, "ROW_COUNT", buf); + } + } + ClearOrSaveResult(result); result = next_result; @@ -1647,17 +1812,10 @@ ExecQueryAndProcessResults(const char *query, } } - /* close \g file if we opened it */ - if (gfile_fout) - { - if (gfile_is_pipe) - { - pclose(gfile_fout); - restore_sigpipe_trap(); - } - else - fclose(gfile_fout); - } + CloseGOutput(gfile_fout, gfile_is_pipe); + + if (result_array) + pg_free(result_array); /* may need this to recover from conn loss during COPY */ if (!CheckConnection()) diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c index 3396f9b462..d8f0a29773 100644 --- a/src/fe_utils/print.c +++ b/src/fe_utils/print.c @@ -3533,17 +3533,42 @@ printTable(const printTableContent *cont, void printQuery(const PGresult *result, const printQueryOpt *opt, FILE *fout, bool is_pager, FILE *flog) +{ + printQueryChunks(&result, 1, opt, fout, is_pager, flog); +} + +/* + * Print the results of a query that may have been obtained by a + * succession of calls to PQgetResult in single-row mode. + * + * results: array of results of a successful query. They must have the same columns. + * nbresults: size of results + * opt: formatting options + * fout: where to print to + * is_pager: true if caller has already redirected fout to be a pager pipe + * flog: if not null, also print the data there (for --log-file option) + */ +void +printQueryChunks(const PGresult *results[], int nresults, const printQueryOpt *opt, + FILE *fout, bool is_pager, FILE *flog) { printTableContent cont; int i, r, c; + int nrows = 0; /* total number of rows */ + int ri; /* index into results[] */ if (cancel_pressed) return; + for (ri = 0; ri < nresults; ri++) + { + nrows += PQntuples(results[ri]); + } + printTableInit(&cont, &opt->topt, opt->title, - PQnfields(result), PQntuples(result)); + (nresults > 0) ? PQnfields(results[0]) : 0, nrows); /* Assert caller supplied enough translate_columns[] entries */ Assert(opt->translate_columns == NULL || @@ -3551,34 +3576,37 @@ printQuery(const PGresult *result, const printQueryOpt *opt, for (i = 0; i < cont.ncolumns; i++) { - printTableAddHeader(&cont, PQfname(result, i), + printTableAddHeader(&cont, PQfname(results[0], i), opt->translate_header, - column_type_alignment(PQftype(result, i))); + column_type_alignment(PQftype(results[0], i))); } /* set cells */ - for (r = 0; r < cont.nrows; r++) + for (ri = 0; ri < nresults; ri++) { - for (c = 0; c < cont.ncolumns; c++) + for (r = 0; r < PQntuples(results[ri]); r++) { - char *cell; - bool mustfree = false; - bool translate; - - if (PQgetisnull(result, r, c)) - cell = opt->nullPrint ? opt->nullPrint : ""; - else + for (c = 0; c < cont.ncolumns; c++) { - cell = PQgetvalue(result, r, c); - if (cont.aligns[c] == 'r' && opt->topt.numericLocale) + char *cell; + bool mustfree = false; + bool translate; + + if (PQgetisnull(results[ri], r, c)) + cell = opt->nullPrint ? opt->nullPrint : ""; + else { - cell = format_numeric_locale(cell); - mustfree = true; + cell = PQgetvalue(results[ri], r, c); + if (cont.aligns[c] == 'r' && opt->topt.numericLocale) + { + cell = format_numeric_locale(cell); + mustfree = true; + } } - } - translate = (opt->translate_columns && opt->translate_columns[c]); - printTableAddCell(&cont, cell, translate, mustfree); + translate = (opt->translate_columns && opt->translate_columns[c]); + printTableAddCell(&cont, cell, translate, mustfree); + } } } diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h index 54f783c907..3befc41bdc 100644 --- a/src/include/fe_utils/print.h +++ b/src/include/fe_utils/print.h @@ -220,7 +220,10 @@ extern void printTableCleanup(printTableContent *const content); extern void printTable(const printTableContent *cont, FILE *fout, bool is_pager, FILE *flog); extern void printQuery(const PGresult *result, const printQueryOpt *opt, - FILE *fout, bool is_pager, FILE *flog); + FILE *fout, bool is_pager, FILE *flog); +extern void printQueryChunks(const PGresult *results[], int nresults, + const printQueryOpt *opt, + FILE *fout, bool is_pager, FILE *flog); extern char column_type_alignment(Oid);