The row-processor API is now in 9.2, but it solves only the 
"different-row-storage" problem, but not the "one-row-at-a-time"
problem, as libpq is still in control until all rows are received.

This means libpq cannet still be used to implement iterative
result processing that almost all high-level languages are using.

We discussed potential API for fetching on single row at a time,
but did not reach conclusion.  Basic arguments were:

1) Tom: PQisBusy must keep current behaviour.  Thus also PQgetResult()
   must keep current behaviour:
   * PQisBusy() -> 0: need to call PQgetResult(), which returns PGresult
   * PQisBusy() -> 1: need to call PQconsumeInput()
   * PQisBusy() must be callable several times in a row, thus be
     stateless from clients POV.

2) Me: behaviour must not be controlled by callback, but client code
   that uses PQgetResult() + PQisBusy().

Now, looking at the problem with some perspective, the solution
is obvious: when in single-row mode, the PQgetResult() must return
proper PGresult for that single row.  And everything else follows that.

Such API is implemented in attached patch:

* PQsetSingleRowMode(conn): set's single-row mode.

* PQisBusy(): stops after each row in single-row mode, sets PGASYNC_ROW_READY.
  Thus keeping the property of being repeatedly callable.

* PQgetResult(): returns copy of the row if PGASYNC_ROW_READY.  Sets row
  resultStatus to PGRES_SINGLE_TUPLE.  This needs to be different from
  PGRES_TUPLES_OK to detect resultset end.

* PQgetRowData(): can be called instead PQgetResult() to get raw row data
  in buffer, for more efficient processing.  This is optional feature
  that provides the original row-callback promise of avoiding unnecessary
  row data copy.

* Although PQgetRowData() makes callback API unnecessary, it is still
  fully compatible with it - the callback should not see any difference
  whether the resultset is processed in single-row mode or
  old single-PGresult mode.  Unless it wants to - it can check
  PGRES_TUPLES_OK vs. PGRES_SINGLE_TUPLE.

There is some duplicate code here that can be refactored (callback exec),
but I did not do it yet to avoid affecting existing code too much.

-- 
marko

PS. If a squint it seems like fix of exising API instead of new feature,
so perhaps it can still fit into 9.2?

commit 4114613 (HEAD, single-row)
Author: Marko Kreen <mark...@gmail.com>
Date:   Sat Apr 7 15:05:01 2012 +0300

    Single-row based processing

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 5c5dd68..0ea2c1f 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4018,6 +4018,75 @@ PGresult *PQgetResult(PGconn *conn);
       </note>
      </listitem>
     </varlistentry>
+
+    <varlistentry id="libpq-pqsetsinglerowmode">
+     <term>
+      <function>PQsetSingleRowMode</function>
+      <indexterm>
+       <primary>PQsetSingleRowMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Instead buffering all rows in <structname>PGresult</structname>
+       until full resultset has arrived, this changes resultset processing
+       to return rows as soon as they arrive from network.
+
+<synopsis>
+int PQsetSingleRowMode(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       The mode can be changed directly after
+       <function>PQsendQuery</function>,
+       <function>PQsendQueryParams</function>,
+       <function>PQsendQueryPrepared</function> call, and before
+       any result rows have arrived from network.  Then this functions
+       changes mode and returns 1.  Otherwise the mode stays unchanged
+       and this functions returns 0.
+      </para>
+
+      <para>
+       The rows returned have PQresultStatus() of <literal>PGRES_SINGLE_TUPLE</literal>.
+       There will be final PGresult that has either <literal>PGRES_TUPLES_OK</literal>
+       or <literal>PGRES_FATAL_ERROR</literal> result status.  In case
+       of error status, the actual query failed in the middle and received rows
+       should be dropped.
+      </para>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqgetrowdata">
+     <term>
+      <function>PQgetRowData</function>
+      <indexterm>
+       <primary>PQgetRowData</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       In single row mode it is possible to get row data directly,
+       without constructing <structname>PGresult</structname> for
+       each row.
+
+<synopsis>
+int PQgetRowData(PGconn *conn, PGresult **hdr, PGdataValue **columns);
+</synopsis>
+      </para>
+
+      <para>
+       It can be called everywhere <function>PQgetResult</function> can.
+       It returns 1 and fills pointers if there is row data avilable.
+       It returns 0 otherwise.  Then <function>PQgetResult</function>
+       should be called to get final status.
+      </para>
+     </listitem>
+    </varlistentry>
+
    </variablelist>
   </para>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1251455..a228a71 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -163,3 +163,5 @@ PQlibVersion              160
 PQsetRowProcessor         161
 PQgetRowProcessor         162
 PQskipResult              163
+PQsetSingleRowMode        164
+PQgetRowData              165
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index badc0b3..ba9215b 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1344,6 +1344,9 @@ PQsendQueryStart(PGconn *conn)
 	/* initialize async result-accumulation state */
 	conn->result = NULL;
 
+	/* reset single-row processing */
+	conn->singleRowMode = false;
+
 	/* ready to send command message */
 	return true;
 }
@@ -1548,6 +1551,166 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
+ * Set row-by-row processing mode.
+ */
+int
+PQsetSingleRowMode(PGconn *conn)
+{
+	/*
+	 * avoid setting the flag in inappropriate time
+	 */
+
+	if (!conn)
+		return 0;
+	if (conn->asyncStatus != PGASYNC_BUSY)
+		return 0;
+	if (conn->queryclass != PGQUERY_SIMPLE && conn->queryclass != PGQUERY_EXTENDED)
+		return 0;
+	if (conn->result)
+		return 0;
+
+	/* set flag */
+	conn->singleRowMode = true;
+	return 1;
+}
+
+/*
+ * Create result that contains current row pointed by rowBuf.
+ */
+static PGresult *
+pqSingleRowResult(PGconn *conn)
+{
+	PGresult		*res;
+	const char		*errmsg = NULL;
+
+	/* Copy row header */
+	res = PQcopyResult(conn->result, PG_COPYRES_ATTRS);
+	if (!res)
+		goto nomem;
+
+	/* Set special status */
+	res->resultStatus = PGRES_SINGLE_TUPLE;
+
+	/*
+	 * Use callback to fill the row.
+	 */
+
+	switch ((*conn->rowProcessor) (res, conn->rowBuf, &errmsg,
+								   conn->rowProcessorParam))
+	{
+		case 1:
+			/* everything is good */
+			return res;
+
+		case -1:
+			/* error, report the errmsg below */
+			break;
+
+		default:
+			/* unrecognized return code */
+			errmsg = libpq_gettext("unrecognized return value from row processor");
+			break;
+	}
+
+	/* free copied PGresult */
+	PQclear(res);
+
+nomem:
+
+	/*
+	 * Replace partially constructed result with an error result. First
+	 * discard the old result to try to win back some memory.
+	 */
+	pqClearAsyncResult(conn);
+
+	/*
+	 * If row processor didn't provide an error message, assume "out of
+	 * memory" was meant.  The advantage of having this special case is that
+	 * freeing the old result first greatly improves the odds that gettext()
+	 * will succeed in providing a translation.
+	 */
+	if (!errmsg)
+		errmsg = libpq_gettext("out of memory for query result");
+
+	printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+	pqSaveErrorResult(conn);
+
+	/*
+	 * Fall back to standard PQgetResult() behaviour
+	 */
+	return pqPrepareAsyncResult(conn);
+}
+
+/*
+ * Get raw row data from network buffer.
+ *
+ * It duplicates the flush/read logic of PQgetResult() to be able
+ * to work on sync connection.  It does not return any error state,
+ * instead it leaves that to actual PQgetResult().
+ *
+ *		Returns: 1 - have row data, 0 - do not have it
+ */
+int
+PQgetRowData(PGconn *conn, PGresult **hdrp, PGdataValue **cols)
+{
+	if (!conn)
+		return 0;
+
+	/* Parse any available data, if our state permits. */
+	parseInput(conn);
+
+	/* If not ready to return something, block until we are. */
+	while (conn->asyncStatus == PGASYNC_BUSY)
+	{
+		int			flushResult;
+
+		/*
+		 * If data remains unsent, send it.  Else we might be waiting for the
+		 * result of a command the backend hasn't even got yet.
+		 */
+		while ((flushResult = pqFlush(conn)) > 0)
+		{
+			if (pqWait(FALSE, TRUE, conn))
+			{
+				flushResult = -1;
+				break;
+			}
+		}
+
+		/* Wait for some more data, and load it. */
+		if (flushResult ||
+			pqWait(TRUE, FALSE, conn) ||
+			pqReadData(conn) < 0)
+		{
+			/*
+			 * conn->errorMessage has been set by pqWait or pqReadData. We
+			 * want to append it to any already-received error message.
+			 */
+			pqSaveErrorResult(conn);
+
+			/* Make PQgetResult() return the error */
+			conn->asyncStatus = PGASYNC_READY;
+			break;
+		}
+
+		/* Parse it. */
+		parseInput(conn);
+	}
+
+	/* should PQgetResult() be called instead? */
+	if (conn->asyncStatus != PGASYNC_ROW_READY)
+		return 0;
+
+	/* allow parsing to proceed */
+	conn->asyncStatus = PGASYNC_BUSY;
+
+	/* return pointers to current row */
+	*hdrp = conn->result;
+	*cols = conn->rowBuf;
+	return 1;
+}
+
+/*
  * Consume any available input from the backend
  * 0 return: some kind of trouble
  * 1 return: no problem
@@ -1594,6 +1757,10 @@ PQconsumeInput(PGconn *conn)
 static void
 parseInput(PGconn *conn)
 {
+	/* special case: there is data to parse, but we must not do it yet. */
+	if (conn->asyncStatus == PGASYNC_ROW_READY)
+		return;
+
 	if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
 		pqParseInput3(conn);
 	else
@@ -1684,6 +1851,12 @@ PQgetResult(PGconn *conn)
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
 			break;
+		case PGASYNC_ROW_READY:
+			/* return copy of current row */
+			res = pqSingleRowResult(conn);
+			/* Set the state back to BUSY, allowing parsing to proceed. */
+			conn->asyncStatus = PGASYNC_BUSY;
+			break;
 		case PGASYNC_COPY_IN:
 			if (conn->result && conn->result->resultStatus == PGRES_COPY_IN)
 				res = pqPrepareAsyncResult(conn);
@@ -2525,6 +2698,9 @@ PQfn(PGconn *conn,
 		return NULL;
 	}
 
+	/* reset single row mode */
+	conn->singleRowMode = false;
+
 	if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
 		return pqFunctionCall3(conn, fnid,
 							   result_buf, actual_result_len,
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 8dbd6b6..3b97720 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -943,6 +943,15 @@ getAnotherTuple(PGconn *conn, bool binary)
 	 */
 	conn->inStart = conn->inCursor;
 
+	/*
+	 * On single-row processing, show that row is available.
+	 */
+	if (conn->singleRowMode)
+	{
+		conn->asyncStatus = PGASYNC_ROW_READY;
+		return EOF;
+	}
+
 	/* Pass the completed row values to rowProcessor */
 	errmsg = NULL;
 	switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 173af2e..1700c67 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -784,6 +784,15 @@ getAnotherTuple(PGconn *conn, int msgLength)
 	 */
 	conn->inStart = conn->inCursor;
 
+	/*
+	 * On single-row processing, show that row is available.
+	 */
+	if (conn->singleRowMode)
+	{
+		conn->asyncStatus = PGASYNC_ROW_READY;
+		return EOF;
+	}
+
 	/* Pass the completed row values to rowProcessor */
 	errmsg = NULL;
 	switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 67db611..d968c09 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -90,7 +90,8 @@ typedef enum
 								 * backend */
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
-	PGRES_COPY_BOTH				/* Copy In/Out data transfer in progress */
+	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGRES_SINGLE_TUPLE			/* PGresult for single tuple from bigger resultset */
 } ExecStatusType;
 
 typedef enum
@@ -406,6 +407,9 @@ extern int PQsendQueryPrepared(PGconn *conn,
 extern PGresult *PQgetResult(PGconn *conn);
 extern PGresult *PQskipResult(PGconn *conn);
 
+extern int PQsetSingleRowMode(PGconn *conn);
+extern int PQgetRowData(PGconn *conn, PGresult **hdrp, PGdataValue **columns);
+
 /* Routines for managing an asynchronous query */
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4bc8926..c779381 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -219,7 +219,8 @@ typedef enum
 	PGASYNC_READY,				/* result ready for PQgetResult */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_ROW_READY			/* single-row result ready for PQgetResult */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -407,6 +408,9 @@ struct pg_conn
 	/* Status for asynchronous result construction */
 	PGresult   *result;			/* result being constructed */
 
+	/* process result row-by-row */
+	bool singleRowMode;
+
 	/* Assorted state for SSL, GSS, etc */
 
 #ifdef USE_SSL
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to