The row-processor API is now in 9.2, but it solves only the
"different-row-storage" problem, but not the "one-row-at-a-time"
problem, as libpq is still in control until all rows are received.
This means libpq cannet still be used to implement iterative
result processing that almost all high-level languages are using.
We discussed potential API for fetching on single row at a time,
but did not reach conclusion. Basic arguments were:
1) Tom: PQisBusy must keep current behaviour. Thus also PQgetResult()
must keep current behaviour:
* PQisBusy() -> 0: need to call PQgetResult(), which returns PGresult
* PQisBusy() -> 1: need to call PQconsumeInput()
* PQisBusy() must be callable several times in a row, thus be
stateless from clients POV.
2) Me: behaviour must not be controlled by callback, but client code
that uses PQgetResult() + PQisBusy().
Now, looking at the problem with some perspective, the solution
is obvious: when in single-row mode, the PQgetResult() must return
proper PGresult for that single row. And everything else follows that.
Such API is implemented in attached patch:
* PQsetSingleRowMode(conn): set's single-row mode.
* PQisBusy(): stops after each row in single-row mode, sets PGASYNC_ROW_READY.
Thus keeping the property of being repeatedly callable.
* PQgetResult(): returns copy of the row if PGASYNC_ROW_READY. Sets row
resultStatus to PGRES_SINGLE_TUPLE. This needs to be different from
PGRES_TUPLES_OK to detect resultset end.
* PQgetRowData(): can be called instead PQgetResult() to get raw row data
in buffer, for more efficient processing. This is optional feature
that provides the original row-callback promise of avoiding unnecessary
row data copy.
* Although PQgetRowData() makes callback API unnecessary, it is still
fully compatible with it - the callback should not see any difference
whether the resultset is processed in single-row mode or
old single-PGresult mode. Unless it wants to - it can check
PGRES_TUPLES_OK vs. PGRES_SINGLE_TUPLE.
There is some duplicate code here that can be refactored (callback exec),
but I did not do it yet to avoid affecting existing code too much.
--
marko
PS. If a squint it seems like fix of exising API instead of new feature,
so perhaps it can still fit into 9.2?
commit 4114613 (HEAD, single-row)
Author: Marko Kreen <[email protected]>
Date: Sat Apr 7 15:05:01 2012 +0300
Single-row based processing
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 5c5dd68..0ea2c1f 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4018,6 +4018,75 @@ PGresult *PQgetResult(PGconn *conn);
</note>
</listitem>
</varlistentry>
+
+ <varlistentry id="libpq-pqsetsinglerowmode">
+ <term>
+ <function>PQsetSingleRowMode</function>
+ <indexterm>
+ <primary>PQsetSingleRowMode</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Instead buffering all rows in <structname>PGresult</structname>
+ until full resultset has arrived, this changes resultset processing
+ to return rows as soon as they arrive from network.
+
+<synopsis>
+int PQsetSingleRowMode(PGconn *conn);
+</synopsis>
+ </para>
+
+ <para>
+ The mode can be changed directly after
+ <function>PQsendQuery</function>,
+ <function>PQsendQueryParams</function>,
+ <function>PQsendQueryPrepared</function> call, and before
+ any result rows have arrived from network. Then this functions
+ changes mode and returns 1. Otherwise the mode stays unchanged
+ and this functions returns 0.
+ </para>
+
+ <para>
+ The rows returned have PQresultStatus() of <literal>PGRES_SINGLE_TUPLE</literal>.
+ There will be final PGresult that has either <literal>PGRES_TUPLES_OK</literal>
+ or <literal>PGRES_FATAL_ERROR</literal> result status. In case
+ of error status, the actual query failed in the middle and received rows
+ should be dropped.
+ </para>
+
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqgetrowdata">
+ <term>
+ <function>PQgetRowData</function>
+ <indexterm>
+ <primary>PQgetRowData</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ In single row mode it is possible to get row data directly,
+ without constructing <structname>PGresult</structname> for
+ each row.
+
+<synopsis>
+int PQgetRowData(PGconn *conn, PGresult **hdr, PGdataValue **columns);
+</synopsis>
+ </para>
+
+ <para>
+ It can be called everywhere <function>PQgetResult</function> can.
+ It returns 1 and fills pointers if there is row data avilable.
+ It returns 0 otherwise. Then <function>PQgetResult</function>
+ should be called to get final status.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</para>
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1251455..a228a71 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -163,3 +163,5 @@ PQlibVersion 160
PQsetRowProcessor 161
PQgetRowProcessor 162
PQskipResult 163
+PQsetSingleRowMode 164
+PQgetRowData 165
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index badc0b3..ba9215b 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1344,6 +1344,9 @@ PQsendQueryStart(PGconn *conn)
/* initialize async result-accumulation state */
conn->result = NULL;
+ /* reset single-row processing */
+ conn->singleRowMode = false;
+
/* ready to send command message */
return true;
}
@@ -1548,6 +1551,166 @@ pqHandleSendFailure(PGconn *conn)
}
/*
+ * Set row-by-row processing mode.
+ */
+int
+PQsetSingleRowMode(PGconn *conn)
+{
+ /*
+ * avoid setting the flag in inappropriate time
+ */
+
+ if (!conn)
+ return 0;
+ if (conn->asyncStatus != PGASYNC_BUSY)
+ return 0;
+ if (conn->queryclass != PGQUERY_SIMPLE && conn->queryclass != PGQUERY_EXTENDED)
+ return 0;
+ if (conn->result)
+ return 0;
+
+ /* set flag */
+ conn->singleRowMode = true;
+ return 1;
+}
+
+/*
+ * Create result that contains current row pointed by rowBuf.
+ */
+static PGresult *
+pqSingleRowResult(PGconn *conn)
+{
+ PGresult *res;
+ const char *errmsg = NULL;
+
+ /* Copy row header */
+ res = PQcopyResult(conn->result, PG_COPYRES_ATTRS);
+ if (!res)
+ goto nomem;
+
+ /* Set special status */
+ res->resultStatus = PGRES_SINGLE_TUPLE;
+
+ /*
+ * Use callback to fill the row.
+ */
+
+ switch ((*conn->rowProcessor) (res, conn->rowBuf, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return res;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+
+ /* free copied PGresult */
+ PQclear(res);
+
+nomem:
+
+ /*
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
+ */
+ pqClearAsyncResult(conn);
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+ pqSaveErrorResult(conn);
+
+ /*
+ * Fall back to standard PQgetResult() behaviour
+ */
+ return pqPrepareAsyncResult(conn);
+}
+
+/*
+ * Get raw row data from network buffer.
+ *
+ * It duplicates the flush/read logic of PQgetResult() to be able
+ * to work on sync connection. It does not return any error state,
+ * instead it leaves that to actual PQgetResult().
+ *
+ * Returns: 1 - have row data, 0 - do not have it
+ */
+int
+PQgetRowData(PGconn *conn, PGresult **hdrp, PGdataValue **cols)
+{
+ if (!conn)
+ return 0;
+
+ /* Parse any available data, if our state permits. */
+ parseInput(conn);
+
+ /* If not ready to return something, block until we are. */
+ while (conn->asyncStatus == PGASYNC_BUSY)
+ {
+ int flushResult;
+
+ /*
+ * If data remains unsent, send it. Else we might be waiting for the
+ * result of a command the backend hasn't even got yet.
+ */
+ while ((flushResult = pqFlush(conn)) > 0)
+ {
+ if (pqWait(FALSE, TRUE, conn))
+ {
+ flushResult = -1;
+ break;
+ }
+ }
+
+ /* Wait for some more data, and load it. */
+ if (flushResult ||
+ pqWait(TRUE, FALSE, conn) ||
+ pqReadData(conn) < 0)
+ {
+ /*
+ * conn->errorMessage has been set by pqWait or pqReadData. We
+ * want to append it to any already-received error message.
+ */
+ pqSaveErrorResult(conn);
+
+ /* Make PQgetResult() return the error */
+ conn->asyncStatus = PGASYNC_READY;
+ break;
+ }
+
+ /* Parse it. */
+ parseInput(conn);
+ }
+
+ /* should PQgetResult() be called instead? */
+ if (conn->asyncStatus != PGASYNC_ROW_READY)
+ return 0;
+
+ /* allow parsing to proceed */
+ conn->asyncStatus = PGASYNC_BUSY;
+
+ /* return pointers to current row */
+ *hdrp = conn->result;
+ *cols = conn->rowBuf;
+ return 1;
+}
+
+/*
* Consume any available input from the backend
* 0 return: some kind of trouble
* 1 return: no problem
@@ -1594,6 +1757,10 @@ PQconsumeInput(PGconn *conn)
static void
parseInput(PGconn *conn)
{
+ /* special case: there is data to parse, but we must not do it yet. */
+ if (conn->asyncStatus == PGASYNC_ROW_READY)
+ return;
+
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
pqParseInput3(conn);
else
@@ -1684,6 +1851,12 @@ PQgetResult(PGconn *conn)
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
break;
+ case PGASYNC_ROW_READY:
+ /* return copy of current row */
+ res = pqSingleRowResult(conn);
+ /* Set the state back to BUSY, allowing parsing to proceed. */
+ conn->asyncStatus = PGASYNC_BUSY;
+ break;
case PGASYNC_COPY_IN:
if (conn->result && conn->result->resultStatus == PGRES_COPY_IN)
res = pqPrepareAsyncResult(conn);
@@ -2525,6 +2698,9 @@ PQfn(PGconn *conn,
return NULL;
}
+ /* reset single row mode */
+ conn->singleRowMode = false;
+
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
return pqFunctionCall3(conn, fnid,
result_buf, actual_result_len,
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 8dbd6b6..3b97720 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -943,6 +943,15 @@ getAnotherTuple(PGconn *conn, bool binary)
*/
conn->inStart = conn->inCursor;
+ /*
+ * On single-row processing, show that row is available.
+ */
+ if (conn->singleRowMode)
+ {
+ conn->asyncStatus = PGASYNC_ROW_READY;
+ return EOF;
+ }
+
/* Pass the completed row values to rowProcessor */
errmsg = NULL;
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 173af2e..1700c67 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -784,6 +784,15 @@ getAnotherTuple(PGconn *conn, int msgLength)
*/
conn->inStart = conn->inCursor;
+ /*
+ * On single-row processing, show that row is available.
+ */
+ if (conn->singleRowMode)
+ {
+ conn->asyncStatus = PGASYNC_ROW_READY;
+ return EOF;
+ }
+
/* Pass the completed row values to rowProcessor */
errmsg = NULL;
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 67db611..d968c09 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -90,7 +90,8 @@ typedef enum
* backend */
PGRES_NONFATAL_ERROR, /* notice or warning message */
PGRES_FATAL_ERROR, /* query failed */
- PGRES_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGRES_SINGLE_TUPLE /* PGresult for single tuple from bigger resultset */
} ExecStatusType;
typedef enum
@@ -406,6 +407,9 @@ extern int PQsendQueryPrepared(PGconn *conn,
extern PGresult *PQgetResult(PGconn *conn);
extern PGresult *PQskipResult(PGconn *conn);
+extern int PQsetSingleRowMode(PGconn *conn);
+extern int PQgetRowData(PGconn *conn, PGresult **hdrp, PGdataValue **columns);
+
/* Routines for managing an asynchronous query */
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4bc8926..c779381 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -219,7 +219,8 @@ typedef enum
PGASYNC_READY, /* result ready for PQgetResult */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
- PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGASYNC_ROW_READY /* single-row result ready for PQgetResult */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
@@ -407,6 +408,9 @@ struct pg_conn
/* Status for asynchronous result construction */
PGresult *result; /* result being constructed */
+ /* process result row-by-row */
+ bool singleRowMode;
+
/* Assorted state for SSL, GSS, etc */
#ifdef USE_SSL
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers