My apologies. The wiki said to discuss early, even before writing the code
if possible, but I added a link to the PR for those who really wanted to
see the details.
I'm attaching a diff now. This is not a patch, it's just a discussion
piece.
The problem was that PQgetCopyData loops use a lot of CPU time, and this
alternative reduces that by a lot.
Jeroen
On Thu, 2 Mar 2023 at 13:38, Daniel Gustafsson <[email protected]> wrote:
> > On 1 Mar 2023, at 15:23, Jeroen Vermeulen <[email protected]> wrote:
>
> > PR for easy discussion: https://github.com/jtv/postgres/pull/1
>
> The process for discussing work on pgsql-hackers is to attach the patch to
> the
> email and discuss it inline in the thread. That way all versions of the
> patch
> as well as the discussion is archived and searchable.
>
> --
> Daniel Gustafsson
>
>
diff --git a/bench.c b/bench.c
new file mode 100644
index 0000000000..c3206d2927
--- /dev/null
+++ b/bench.c
@@ -0,0 +1,134 @@
+/*
+ * Minimal benchmark for PQgetCopyData alternative.
+ *
+ * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the
+ * proposed new function), then run the binary through "time" to get time and
+ * CPU usage stats.
+ *
+ * DO NOT UPSTREAM THIS FILE. It's just a demonstration for the prototype
+ * patch.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <libpq-fe.h>
+
+/* Define CALL to...
+ * 0: Use classic PQgetCopyData()
+ * 1: Use experimental PQhandleCopyData()
+ */
+
+/* Benchmark results (best result per category, out of 4 runs):
+ *
+ * PQgetCopyData:
+ * real - 0m32.972s
+ * user - 0m11.364s
+ * sys - 0m1.255s
+ *
+ * PQhandleCopyData:
+ * real - 0m32.839s
+ * user - 0m3.407s
+ * sys - 0m0.872s
+ */
+
+#if CALL == 1
+/*
+ * Print line, add newline.
+ */
+static int
+print_row_and_newline(void *, char *buf, size_t len)
+{
+ /* Zero-terminate the buffer. */
+ buf[len - 1] = '\0';
+ printf("%s\n", buf);
+ return 0;
+}
+#endif
+
+
+int
+main()
+{
+#if !defined(CALL)
+#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData."
+#elif CALL == 0
+ fprintf(stderr, "Testing classic PQgetCopyData().\n");
+#elif CALL == 1
+ fprintf(stderr, "Testing experimental PQhandleCopyData.\n");
+#else
+#error "Unknown CALL value."
+#endif
+
+ PGconn *cx = PQconnectdb("");
+
+ if (!cx)
+ {
+ fprintf(stderr, "Could not connect.\n");
+ exit(1);
+ }
+ PGresult *tx = PQexec(cx, "BEGIN");
+
+ if (!tx)
+ {
+ fprintf(stderr, "No result from BEGIN!\n");
+ exit(1);
+ }
+ int s = PQresultStatus(tx);
+
+ if (s != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "Failed to start transaction: status %d.\n", s);
+ exit(1);
+ }
+
+ PGresult *r = PQexec(
+ cx,
+ "COPY ("
+ "SELECT generate_series, 'row #' || generate_series "
+ "FROM generate_series(1, 100000000)"
+ ") TO STDOUT"
+ );
+
+ if (!r)
+ {
+ fprintf(stderr, "No result!\n");
+ exit(1);
+ }
+ int status = PQresultStatus(r);
+
+ if (status != PGRES_COPY_OUT)
+ {
+ fprintf(stderr, "Failed to start COPY: status %d.\n", status);
+ exit(1);
+ }
+
+ int bytes;
+#if CALL == 0
+ char *buffer = NULL;
+
+ for (
+ bytes = PQgetCopyData(cx, &buffer, 0);
+ bytes > 0;
+ bytes = PQgetCopyData(cx, &buffer, 0)
+ )
+ {
+ if (buffer)
+ {
+ printf("%s", buffer);
+ PQfreemem(buffer);
+ }
+ }
+#elif CALL == 1
+ while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0);
+#else
+#error "Unknown CALL value."
+#endif
+
+ if (bytes != -1)
+ {
+ fprintf(stderr, "Got unexpected result: %d.\n", bytes);
+ exit(1);
+ }
+
+ /* (Don't bother cleaning up.) */
+}
diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c
index 641851983d..1e4eba58a2 100644
--- a/src/interfaces/ecpg/ecpglib/execute.c
+++ b/src/interfaces/ecpg/ecpglib/execute.c
@@ -32,6 +32,17 @@
#include "sqlda-compat.h"
#include "sqlda-native.h"
+/*
+ * Print non-zero-terminated line received from COPY.
+ */
+static int
+print_row(void *, char *buf, size_t len)
+{
+ buf[len - 1] = '\0';
+ printf("%s\n", buf);
+ return 0;
+}
+
/*
* This function returns a newly malloced string that has ' and \
* escaped.
@@ -1876,16 +1887,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result)
break;
case PGRES_COPY_OUT:
{
- char *buffer;
int res;
ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno);
- while ((res = PQgetCopyData(stmt->connection->connection,
- &buffer, 0)) > 0)
- {
- printf("%s", buffer);
- PQfreemem(buffer);
- }
+ while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0);
if (res == -1)
{
/* COPY done */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..add1ff1591 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus 183
PQsetTraceFlags 184
PQmblenBounded 185
PQsendFlushRequest 186
+PQhandleCopyData 187
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ec62550e38..d0f0501e80 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2709,6 +2709,42 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
return pqGetCopyData3(conn, buffer, async);
}
+/*
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and invoke a callback.
+ *
+ * Pass a "handler" callback which takes a buffer and its size. (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row. The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size. However, you may
+ * modify the buffer's contents, and the line ends in a newline. If you need
+ * a terminating zero, you are free to overwrite the newline.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
+ */
+int
+PQhandleCopyData(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async)
+{
+ if (!conn)
+ return -2;
+ if (conn->asyncStatus != PGASYNC_COPY_OUT &&
+ conn->asyncStatus != PGASYNC_COPY_BOTH)
+ {
+ libpq_append_conn_error(conn, "no COPY in progress");
+ return -2;
+ }
+ return pqHandleCopyData3(conn, handler, context, async);
+}
+
/*
* PQgetline - gets a newline-terminated string from the backend.
*
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 8ab6a88416..1f4ecec9bd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1783,6 +1783,63 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
}
}
+/*
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and pass it to a caller-supplied buffer.
+ *
+ * Pass a "handler" callback which takes a buffer and its size. (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row. The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size. However, you may
+ * modify the buffer's contents, and the line ends in a newline. If you need
+ * a terminating zero, you are free to overwrite the newline.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
+ */
+int
+pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async)
+{
+ int msgLength;
+
+ for (;;)
+ {
+ msgLength = getCopyDataMessage(conn);
+ if (msgLength < 0)
+ return msgLength; /* end-of-copy or error */
+ if (msgLength == 0)
+ {
+ /* Don't block if async read requested */
+ if (async)
+ return 0;
+ /* Need to load more data */
+ if (pqWait(true, false, conn) ||
+ pqReadData(conn) < 0)
+ return -2;
+ continue;
+ }
+
+ msgLength -= 4;
+ if (msgLength > 0)
+ {
+ /* We have a row. Call the handler. */
+ handler(context, &conn->inBuffer[conn->inCursor], msgLength);
+ conn->inStart = conn->inCursor + msgLength;
+ return msgLength;
+ }
+
+ conn->inStart = conn->inCursor;
+ }
+}
+
/*
* PQgetline - gets a newline-terminated string from the backend.
*
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f3d9220496..c07544b255 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -482,6 +482,9 @@ extern int PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
extern int PQputCopyEnd(PGconn *conn, const char *errormsg);
extern int PQgetCopyData(PGconn *conn, char **buffer, int async);
+/* TODO: "House style" would be int, rather than size_t. */
+extern int PQhandleCopyData(PGconn *conn, int handler(void *, char *, size_t), void *context, int async);
+
/* Deprecated routines for copy in/out */
extern int PQgetline(PGconn *conn, char *buffer, int length);
extern int PQputline(PGconn *conn, const char *string);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d94b648ea5..8936ea0388 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -687,6 +687,7 @@ extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res,
PGVerbosity verbosity, PGContextVisibility show_context);
extern int pqGetNegotiateProtocolVersion3(PGconn *conn);
extern int pqGetCopyData3(PGconn *conn, char **buffer, int async);
+extern int pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async);
extern int pqGetline3(PGconn *conn, char *s, int maxlen);
extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize);
extern int pqEndcopy3(PGconn *conn);