Interested, yes. But there may be reasons not to do that. Last time I
looked the binary format wasn't documented.
Anyway, I tried re-implementing pqGetCopyData3() using the callback.
Wasn't hard, but I did have to add a way for the callback to return an
error. Kept it pretty dumb for now, hoping a sensible rule will become
obvious later.
Saw no obvious performance impact, so that's good.
Jeroen
On Fri, 3 Mar 2023 at 19:53, Tom Lane <[email protected]> wrote:
> Jeroen Vermeulen <[email protected]> writes:
> > The printf() is just the simplest example that sprang to mind though.
> > There may be other use-cases out there involving libraries that require
> > zero-terminated strings, and I figured an ability to set a sentinel could
> > help those.
>
> Well, since it won't help for binary COPY, I'm skeptical that this is
> something we should cater to. Anybody who's sufficiently worried about
> performance to be trying to remove malloc/free cycles ought to be
> interested in binary COPY as well.
>
> regards, tom lane
>
diff --git a/bench.c b/bench.c
new file mode 100644
index 0000000000..35f8c7a36f
--- /dev/null
+++ b/bench.c
@@ -0,0 +1,132 @@
+/*
+ * Minimal benchmark for PQgetCopyData alternative.
+ *
+ * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the
+ * proposed new function), then run the binary through "time" to get time and
+ * CPU usage stats.
+ *
+ * DO NOT UPSTREAM THIS FILE. It's just a demonstration for the prototype
+ * patch.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <libpq-fe.h>
+
+/* Define CALL to...
+ * 0: Use classic PQgetCopyData()
+ * 1: Use experimental PQhandleCopyData()
+ */
+
+/* Benchmark results (best result per category, out of 4 runs):
+ *
+ * PQgetCopyData:
+ * real - 0m32.972s
+ * user - 0m11.364s
+ * sys - 0m1.255s
+ *
+ * PQhandleCopyData:
+ * real - 0m32.839s
+ * user - 0m3.407s
+ * sys - 0m0.872s
+ */
+
+#if CALL == 1
+/*
+ * Print line, add newline.
+ */
+static int
+print_row_and_newline(void *, const char *buf, size_t len)
+{
+ fwrite(buf, 1, len, stdout);
+ return 0;
+}
+#endif
+
+
+int
+main()
+{
+#if !defined(CALL)
+#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData."
+#elif CALL == 0
+ fprintf(stderr, "Testing classic PQgetCopyData().\n");
+#elif CALL == 1
+ fprintf(stderr, "Testing experimental PQhandleCopyData.\n");
+#else
+#error "Unknown CALL value."
+#endif
+
+ PGconn *cx = PQconnectdb("");
+
+ if (!cx)
+ {
+ fprintf(stderr, "Could not connect.\n");
+ exit(1);
+ }
+ PGresult *tx = PQexec(cx, "BEGIN");
+
+ if (!tx)
+ {
+ fprintf(stderr, "No result from BEGIN!\n");
+ exit(1);
+ }
+ int s = PQresultStatus(tx);
+
+ if (s != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "Failed to start transaction: status %d.\n", s);
+ exit(1);
+ }
+
+ PGresult *r = PQexec(
+ cx,
+ "COPY ("
+ "SELECT generate_series, 'row #' || generate_series "
+ "FROM generate_series(1, 100000000)"
+ ") TO STDOUT"
+ );
+
+ if (!r)
+ {
+ fprintf(stderr, "No result!\n");
+ exit(1);
+ }
+ int status = PQresultStatus(r);
+
+ if (status != PGRES_COPY_OUT)
+ {
+ fprintf(stderr, "Failed to start COPY: status %d.\n", status);
+ exit(1);
+ }
+
+ int bytes;
+#if CALL == 0
+ char *buffer = NULL;
+
+ for (
+ bytes = PQgetCopyData(cx, &buffer, 0);
+ bytes > 0;
+ bytes = PQgetCopyData(cx, &buffer, 0)
+ )
+ {
+ if (buffer)
+ {
+ printf("%s", buffer);
+ PQfreemem(buffer);
+ }
+ }
+#elif CALL == 1
+ while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0);
+#else
+#error "Unknown CALL value."
+#endif
+
+ if (bytes != -1)
+ {
+ fprintf(stderr, "Got unexpected result: %d.\n", bytes);
+ exit(1);
+ }
+
+ /* (Don't bother cleaning up.) */
+}
diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c
index 641851983d..aaf31ea216 100644
--- a/src/interfaces/ecpg/ecpglib/execute.c
+++ b/src/interfaces/ecpg/ecpglib/execute.c
@@ -32,6 +32,16 @@
#include "sqlda-compat.h"
#include "sqlda-native.h"
+/*
+ * Print non-zero-terminated line received from COPY.
+ */
+static int
+print_row(void *, const char *buf, size_t len)
+{
+ fwrite(buf, 1, len, stdout);
+ return 0;
+}
+
/*
* This function returns a newly malloced string that has ' and \
* escaped.
@@ -1876,16 +1886,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result)
break;
case PGRES_COPY_OUT:
{
- char *buffer;
int res;
ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno);
- while ((res = PQgetCopyData(stmt->connection->connection,
- &buffer, 0)) > 0)
- {
- printf("%s", buffer);
- PQfreemem(buffer);
- }
+ while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0);
if (res == -1)
{
/* COPY done */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..add1ff1591 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus 183
PQsetTraceFlags 184
PQmblenBounded 185
PQsendFlushRequest 186
+PQhandleCopyData 187
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ec62550e38..1c9f0ee175 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2684,6 +2684,27 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
return 1;
}
+struct GetCopyData_context
+{
+ PGconn *conn;
+ char *buffer;
+};
+
+static int alloc_copy_buffer(void *context, const char *inbuf, size_t len)
+{
+ struct GetCopyData_context *params = (struct GetCopyData_context *) context;
+ PGconn *conn = params->conn;
+ params->buffer = (char *) malloc(len + 1);
+ if (params->buffer == NULL)
+ {
+ libpq_append_conn_error(conn, "out of memory");
+ return -2;
+ }
+ memcpy(params->buffer, &conn->inBuffer[conn->inCursor], len);
+ params->buffer[len] = '\0'; /* Add terminating null */
+ return 0;
+}
+
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
@@ -2697,7 +2718,41 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
int
PQgetCopyData(PGconn *conn, char **buffer, int async)
{
- *buffer = NULL; /* for all failure cases */
+ struct GetCopyData_context context;
+ int result;
+ context.conn = conn;
+ context.buffer = NULL; /* for all failure cases */
+ result = pqGetCopyData3(conn, alloc_copy_buffer, &context, async);
+ *buffer = context.buffer;
+ return result;
+}
+
+/*
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and invoke a callback.
+ *
+ * Pass a "handler" callback which takes a buffer and its size. (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row. The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
+ */
+int
+PQhandleCopyData(PGconn *conn,
+ int (*handler) (void *, const char *, size_t),
+ void *context,
+ int async)
+{
if (!conn)
return -2;
if (conn->asyncStatus != PGASYNC_COPY_OUT &&
@@ -2706,7 +2761,7 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
libpq_append_conn_error(conn, "no COPY in progress");
return -2;
}
- return pqGetCopyData3(conn, buffer, async);
+ return pqGetCopyData3(conn, handler, context, async);
}
/*
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 8ab6a88416..8c89462704 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1720,27 +1720,35 @@ getCopyDataMessage(PGconn *conn)
}
/*
- * PQgetCopyData - read a row of data from the backend during COPY OUT
- * or COPY BOTH
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and pass it to a caller-supplied buffer.
*
- * If successful, sets *buffer to point to a malloc'd row of data, and
- * returns row length (always > 0) as result.
- * Returns 0 if no row available yet (only possible if async is true),
- * -1 if end of copy (consult PQgetResult), or -2 if error (consult
- * PQerrorMessage).
+ * Pass a "handler" callback which takes a buffer and its size. (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row. The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
*/
int
-pqGetCopyData3(PGconn *conn, char **buffer, int async)
+pqGetCopyData3(PGconn *conn,
+ int (*handler) (void *, const char *, size_t),
+ void *context,
+ int async)
{
int msgLength;
for (;;)
{
- /*
- * Collect the next input message. To make life simpler for async
- * callers, we keep returning 0 until the next message is fully
- * available, even if it is not Copy Data.
- */
msgLength = getCopyDataMessage(conn);
if (msgLength < 0)
return msgLength; /* end-of-copy or error */
@@ -1756,29 +1764,19 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
continue;
}
- /*
- * Drop zero-length messages (shouldn't happen anyway). Otherwise
- * pass the data back to the caller.
- */
msgLength -= 4;
if (msgLength > 0)
{
- *buffer = (char *) malloc(msgLength + 1);
- if (*buffer == NULL)
- {
- libpq_append_conn_error(conn, "out of memory");
- return -2;
- }
- memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
- (*buffer)[msgLength] = '\0'; /* Add terminating null */
-
- /* Mark message consumed */
+ /* We have a row. Call the handler. */
+ int result = handler(context,
+ &conn->inBuffer[conn->inCursor],
+ msgLength);
conn->inStart = conn->inCursor + msgLength;
-
+ if (result < 0)
+ return result;
return msgLength;
}
- /* Empty, so drop it and loop around for another */
conn->inStart = conn->inCursor;
}
}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f3d9220496..4963aa1e51 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -482,6 +482,12 @@ extern int PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
extern int PQputCopyEnd(PGconn *conn, const char *errormsg);
extern int PQgetCopyData(PGconn *conn, char **buffer, int async);
+/* TODO: "House style" would be int, rather than size_t. */
+extern int PQhandleCopyData(PGconn *conn,
+ int handler(void *, const char *, size_t),
+ void *context,
+ int async);
+
/* Deprecated routines for copy in/out */
extern int PQgetline(PGconn *conn, char *buffer, int length);
extern int PQputline(PGconn *conn, const char *string);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d94b648ea5..b5f8b609bc 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -686,7 +686,10 @@ extern int pqGetErrorNotice3(PGconn *conn, bool isError);
extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res,
PGVerbosity verbosity, PGContextVisibility show_context);
extern int pqGetNegotiateProtocolVersion3(PGconn *conn);
-extern int pqGetCopyData3(PGconn *conn, char **buffer, int async);
+extern int pqGetCopyData3(PGconn *conn,
+ int (*handler) (void *, const char *, size_t),
+ void *context,
+ int async);
extern int pqGetline3(PGconn *conn, char *s, int maxlen);
extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize);
extern int pqEndcopy3(PGconn *conn);