The row-processor API is now in 9.2, but it solves only the "different-row-storage" problem, but not the "one-row-at-a-time" problem, as libpq is still in control until all rows are received.
This means libpq cannet still be used to implement iterative result processing that almost all high-level languages are using. We discussed potential API for fetching on single row at a time, but did not reach conclusion. Basic arguments were: 1) Tom: PQisBusy must keep current behaviour. Thus also PQgetResult() must keep current behaviour: * PQisBusy() -> 0: need to call PQgetResult(), which returns PGresult * PQisBusy() -> 1: need to call PQconsumeInput() * PQisBusy() must be callable several times in a row, thus be stateless from clients POV. 2) Me: behaviour must not be controlled by callback, but client code that uses PQgetResult() + PQisBusy(). Now, looking at the problem with some perspective, the solution is obvious: when in single-row mode, the PQgetResult() must return proper PGresult for that single row. And everything else follows that. Such API is implemented in attached patch: * PQsetSingleRowMode(conn): set's single-row mode. * PQisBusy(): stops after each row in single-row mode, sets PGASYNC_ROW_READY. Thus keeping the property of being repeatedly callable. * PQgetResult(): returns copy of the row if PGASYNC_ROW_READY. Sets row resultStatus to PGRES_SINGLE_TUPLE. This needs to be different from PGRES_TUPLES_OK to detect resultset end. * PQgetRowData(): can be called instead PQgetResult() to get raw row data in buffer, for more efficient processing. This is optional feature that provides the original row-callback promise of avoiding unnecessary row data copy. * Although PQgetRowData() makes callback API unnecessary, it is still fully compatible with it - the callback should not see any difference whether the resultset is processed in single-row mode or old single-PGresult mode. Unless it wants to - it can check PGRES_TUPLES_OK vs. PGRES_SINGLE_TUPLE. There is some duplicate code here that can be refactored (callback exec), but I did not do it yet to avoid affecting existing code too much. -- marko PS. If a squint it seems like fix of exising API instead of new feature, so perhaps it can still fit into 9.2?
commit 4114613 (HEAD, single-row) Author: Marko Kreen <mark...@gmail.com> Date: Sat Apr 7 15:05:01 2012 +0300 Single-row based processing diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 5c5dd68..0ea2c1f 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -4018,6 +4018,75 @@ PGresult *PQgetResult(PGconn *conn); </note> </listitem> </varlistentry> + + <varlistentry id="libpq-pqsetsinglerowmode"> + <term> + <function>PQsetSingleRowMode</function> + <indexterm> + <primary>PQsetSingleRowMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Instead buffering all rows in <structname>PGresult</structname> + until full resultset has arrived, this changes resultset processing + to return rows as soon as they arrive from network. + +<synopsis> +int PQsetSingleRowMode(PGconn *conn); +</synopsis> + </para> + + <para> + The mode can be changed directly after + <function>PQsendQuery</function>, + <function>PQsendQueryParams</function>, + <function>PQsendQueryPrepared</function> call, and before + any result rows have arrived from network. Then this functions + changes mode and returns 1. Otherwise the mode stays unchanged + and this functions returns 0. + </para> + + <para> + The rows returned have PQresultStatus() of <literal>PGRES_SINGLE_TUPLE</literal>. + There will be final PGresult that has either <literal>PGRES_TUPLES_OK</literal> + or <literal>PGRES_FATAL_ERROR</literal> result status. In case + of error status, the actual query failed in the middle and received rows + should be dropped. + </para> + + </listitem> + </varlistentry> + + <varlistentry id="libpq-pqgetrowdata"> + <term> + <function>PQgetRowData</function> + <indexterm> + <primary>PQgetRowData</primary> + </indexterm> + </term> + + <listitem> + <para> + In single row mode it is possible to get row data directly, + without constructing <structname>PGresult</structname> for + each row. + +<synopsis> +int PQgetRowData(PGconn *conn, PGresult **hdr, PGdataValue **columns); +</synopsis> + </para> + + <para> + It can be called everywhere <function>PQgetResult</function> can. + It returns 1 and fills pointers if there is row data avilable. + It returns 0 otherwise. Then <function>PQgetResult</function> + should be called to get final status. + </para> + </listitem> + </varlistentry> + </variablelist> </para> diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1251455..a228a71 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -163,3 +163,5 @@ PQlibVersion 160 PQsetRowProcessor 161 PQgetRowProcessor 162 PQskipResult 163 +PQsetSingleRowMode 164 +PQgetRowData 165 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index badc0b3..ba9215b 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1344,6 +1344,9 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result = NULL; + /* reset single-row processing */ + conn->singleRowMode = false; + /* ready to send command message */ return true; } @@ -1548,6 +1551,166 @@ pqHandleSendFailure(PGconn *conn) } /* + * Set row-by-row processing mode. + */ +int +PQsetSingleRowMode(PGconn *conn) +{ + /* + * avoid setting the flag in inappropriate time + */ + + if (!conn) + return 0; + if (conn->asyncStatus != PGASYNC_BUSY) + return 0; + if (conn->queryclass != PGQUERY_SIMPLE && conn->queryclass != PGQUERY_EXTENDED) + return 0; + if (conn->result) + return 0; + + /* set flag */ + conn->singleRowMode = true; + return 1; +} + +/* + * Create result that contains current row pointed by rowBuf. + */ +static PGresult * +pqSingleRowResult(PGconn *conn) +{ + PGresult *res; + const char *errmsg = NULL; + + /* Copy row header */ + res = PQcopyResult(conn->result, PG_COPYRES_ATTRS); + if (!res) + goto nomem; + + /* Set special status */ + res->resultStatus = PGRES_SINGLE_TUPLE; + + /* + * Use callback to fill the row. + */ + + switch ((*conn->rowProcessor) (res, conn->rowBuf, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return res; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + + /* free copied PGresult */ + PQclear(res); + +nomem: + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); + pqSaveErrorResult(conn); + + /* + * Fall back to standard PQgetResult() behaviour + */ + return pqPrepareAsyncResult(conn); +} + +/* + * Get raw row data from network buffer. + * + * It duplicates the flush/read logic of PQgetResult() to be able + * to work on sync connection. It does not return any error state, + * instead it leaves that to actual PQgetResult(). + * + * Returns: 1 - have row data, 0 - do not have it + */ +int +PQgetRowData(PGconn *conn, PGresult **hdrp, PGdataValue **cols) +{ + if (!conn) + return 0; + + /* Parse any available data, if our state permits. */ + parseInput(conn); + + /* If not ready to return something, block until we are. */ + while (conn->asyncStatus == PGASYNC_BUSY) + { + int flushResult; + + /* + * If data remains unsent, send it. Else we might be waiting for the + * result of a command the backend hasn't even got yet. + */ + while ((flushResult = pqFlush(conn)) > 0) + { + if (pqWait(FALSE, TRUE, conn)) + { + flushResult = -1; + break; + } + } + + /* Wait for some more data, and load it. */ + if (flushResult || + pqWait(TRUE, FALSE, conn) || + pqReadData(conn) < 0) + { + /* + * conn->errorMessage has been set by pqWait or pqReadData. We + * want to append it to any already-received error message. + */ + pqSaveErrorResult(conn); + + /* Make PQgetResult() return the error */ + conn->asyncStatus = PGASYNC_READY; + break; + } + + /* Parse it. */ + parseInput(conn); + } + + /* should PQgetResult() be called instead? */ + if (conn->asyncStatus != PGASYNC_ROW_READY) + return 0; + + /* allow parsing to proceed */ + conn->asyncStatus = PGASYNC_BUSY; + + /* return pointers to current row */ + *hdrp = conn->result; + *cols = conn->rowBuf; + return 1; +} + +/* * Consume any available input from the backend * 0 return: some kind of trouble * 1 return: no problem @@ -1594,6 +1757,10 @@ PQconsumeInput(PGconn *conn) static void parseInput(PGconn *conn) { + /* special case: there is data to parse, but we must not do it yet. */ + if (conn->asyncStatus == PGASYNC_ROW_READY) + return; + if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3) pqParseInput3(conn); else @@ -1684,6 +1851,12 @@ PQgetResult(PGconn *conn) /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; break; + case PGASYNC_ROW_READY: + /* return copy of current row */ + res = pqSingleRowResult(conn); + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + break; case PGASYNC_COPY_IN: if (conn->result && conn->result->resultStatus == PGRES_COPY_IN) res = pqPrepareAsyncResult(conn); @@ -2525,6 +2698,9 @@ PQfn(PGconn *conn, return NULL; } + /* reset single row mode */ + conn->singleRowMode = false; + if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3) return pqFunctionCall3(conn, fnid, result_buf, actual_result_len, diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 8dbd6b6..3b97720 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -943,6 +943,15 @@ getAnotherTuple(PGconn *conn, bool binary) */ conn->inStart = conn->inCursor; + /* + * On single-row processing, show that row is available. + */ + if (conn->singleRowMode) + { + conn->asyncStatus = PGASYNC_ROW_READY; + return EOF; + } + /* Pass the completed row values to rowProcessor */ errmsg = NULL; switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 173af2e..1700c67 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -784,6 +784,15 @@ getAnotherTuple(PGconn *conn, int msgLength) */ conn->inStart = conn->inCursor; + /* + * On single-row processing, show that row is available. + */ + if (conn->singleRowMode) + { + conn->asyncStatus = PGASYNC_ROW_READY; + return EOF; + } + /* Pass the completed row values to rowProcessor */ errmsg = NULL; switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 67db611..d968c09 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -90,7 +90,8 @@ typedef enum * backend */ PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ - PGRES_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGRES_SINGLE_TUPLE /* PGresult for single tuple from bigger resultset */ } ExecStatusType; typedef enum @@ -406,6 +407,9 @@ extern int PQsendQueryPrepared(PGconn *conn, extern PGresult *PQgetResult(PGconn *conn); extern PGresult *PQskipResult(PGconn *conn); +extern int PQsetSingleRowMode(PGconn *conn); +extern int PQgetRowData(PGconn *conn, PGresult **hdrp, PGdataValue **columns); + /* Routines for managing an asynchronous query */ extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 4bc8926..c779381 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -219,7 +219,8 @@ typedef enum PGASYNC_READY, /* result ready for PQgetResult */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_ROW_READY /* single-row result ready for PQgetResult */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ @@ -407,6 +408,9 @@ struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ + /* process result row-by-row */ + bool singleRowMode; + /* Assorted state for SSL, GSS, etc */ #ifdef USE_SSL
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers