This is something I hacked together on the way back from pgconf.eu.
It's highly experimental.
The idea is to do the equivalent of pg_wal_replay_wait() on the protocol
level, so that it is ideally fully transparent to the application code.
The application just issues queries, and they might be serviced by a
primary or a standby, but there is always a correct ordering of reads
after writes.
Additionally, I'm exploring whether this is an idea for a protocol
extension that might be a bit more complex than, say, longer cancel
keys, something we could have a discussion around protocol versioning
around.
The patch adds a protocol extension called _pq_.wait_for_lsn as well as
a libpq connection option wait_for_lsn to activate the same. (Use e.g.,
psql -d 'wait_for_lsn=1'.)
With this protocol extension, two things are changed:
- The ReadyForQuery message sends back the current LSN.
- The Query message sends an LSN to wait for. (This doesn't handle the
extended query protocol yet.)
To make any real use of this, you'd need some middleware, like a hacked
pgbouncer, that transparently redirects queries among primaries and
standbys, which doesn't exist yet. But if it did, I imagine it could be
pretty useful.
There might be other ways to slice this. Instead of using a
hypothetical middleware, the application would use two connections, one
for writing, one for reading, and the LSN would be communicated between
the two. I imagine in this case, at least the one half of the protocol,
shipping the current LSN with ReadyForQuery, could be useful, instead of
requiring application code to issue pg_current_wal_insert_lsn() explicitly.
Thoughts?
From 44b6354429847e3b3aeac21ee5712879b97d7877 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Sat, 26 Oct 2024 01:11:57 +0200
Subject: [PATCH v0] wait_for_lsn protocol option
---
src/backend/tcop/backend_startup.c | 15 +++++++++++++--
src/backend/tcop/dest.c | 12 ++++++++++++
src/backend/tcop/postgres.c | 23 +++++++++++++++++++++++
src/include/libpq/libpq-be.h | 1 +
src/interfaces/libpq/fe-connect.c | 26 ++++++++++++++++++++++++++
src/interfaces/libpq/fe-exec.c | 1 +
src/interfaces/libpq/fe-protocol3.c | 20 ++++++++++++++++++++
src/interfaces/libpq/fe-trace.c | 2 ++
src/interfaces/libpq/libpq-int.h | 3 +++
9 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/src/backend/tcop/backend_startup.c
b/src/backend/tcop/backend_startup.c
index 2a96c81f925..bd3b91d01eb 100644
--- a/src/backend/tcop/backend_startup.c
+++ b/src/backend/tcop/backend_startup.c
@@ -768,12 +768,23 @@ ProcessStartupPacket(Port *port, bool ssl_done, bool
gss_done)
valptr),
errhint("Valid values
are: \"false\", 0, \"true\", 1, \"database\".")));
}
+ else if (strcmp(nameptr, "_pq_.wait_for_lsn") == 0)
+ {
+ if (strcmp(valptr, "1") == 0)
+ port->wait_for_lsn_enabled = true;
+ else
+ ereport(FATAL,
+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value
for parameter \"%s\": \"%s\"",
+
"wait_for_lsn",
+ valptr),
+ errhint("Valid values
are: 1.")));
+ }
else if (strncmp(nameptr, "_pq_.", 5) == 0)
{
/*
* Any option beginning with _pq_. is reserved
for use as a
- * protocol-level option, but at present no
such options are
- * defined.
+ * protocol-level option.
*/
unrecognized_protocol_options =
lappend(unrecognized_protocol_options,
pstrdup(nameptr));
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 96f80b30463..bb9910b12d5 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -31,6 +31,8 @@
#include "access/printsimple.h"
#include "access/printtup.h"
#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
#include "commands/copy.h"
#include "commands/createas.h"
#include "commands/explain.h"
@@ -40,6 +42,7 @@
#include "executor/tstoreReceiver.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
+#include "miscadmin.h"
/* ----------------
@@ -265,6 +268,15 @@ ReadyForQuery(CommandDest dest)
pq_beginmessage(&buf, PqMsg_ReadyForQuery);
pq_sendbyte(&buf, TransactionBlockStatusCode());
+ if (MyProcPort->wait_for_lsn_enabled)
+ {
+ char xloc[MAXFNAMELEN];
+ XLogRecPtr logptr;
+
+ logptr = GetXLogWriteRecPtr();
+ snprintf(xloc, sizeof(xloc), "%X/%X",
LSN_FORMAT_ARGS(logptr));
+ pq_sendstring(&buf, xloc);
+ }
pq_endmessage(&buf);
}
/* Flush output at end of cycle in any case. */
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7f5eada9d45..aee6fec1fc4 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -38,6 +38,7 @@
#include "commands/async.h"
#include "commands/event_trigger.h"
#include "commands/prepare.h"
+#include "commands/waitlsn.h"
#include "common/pg_prng.h"
#include "jit/jit.h"
#include "libpq/libpq.h"
@@ -75,6 +76,7 @@
#include "utils/injection_point.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timeout.h"
@@ -4782,6 +4784,27 @@ PostgresMain(const char *dbname, const char *username)
SetCurrentStatementStartTimestamp();
query_string =
pq_getmsgstring(&input_message);
+ if (MyProcPort &&
MyProcPort->wait_for_lsn_enabled)
+ {
+ const char *wait_for_lsn =
pq_getmsgstring(&input_message);
+ XLogRecPtr lsn;
+ bool error;
+
+ lsn =
pg_lsn_in_internal(wait_for_lsn, &error);
+ if (error)
+ ereport(ERROR,
+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
+
errmsg("invalid LSN %s", wait_for_lsn)));
+ if (RecoveryInProgress())
+ WaitForLSNReplay(lsn,
0);
+ else
+ {
+ if
(GetXLogWriteRecPtr() != lsn)
+ ereport(ERROR,
+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
+
errmsg("LSN mismatch")));
+ }
+ }
pq_getmsgend(&input_message);
if (am_walsender)
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 05cb1874c58..0f23a969231 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -150,6 +150,7 @@ typedef struct Port
*/
char *database_name;
char *user_name;
+ bool wait_for_lsn_enabled;
char *cmdline_options;
List *guc_options;
diff --git a/src/interfaces/libpq/fe-connect.c
b/src/interfaces/libpq/fe-connect.c
index 64787bea511..aa56ca42b2c 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -360,6 +360,10 @@ static const internalPQconninfoOption PQconninfoOptions[]
= {
"Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") =
15 */
offsetof(struct pg_conn, target_session_attrs)},
+ {"wait_for_lsn", "PGWAITFORLSN", "0", NULL,
+ "Wait-For-LSN", "", 1,
+ offsetof(struct pg_conn, wait_for_lsn_setting)},
+
{"load_balance_hosts", "PGLOADBALANCEHOSTS",
DefaultLoadBalanceHosts, NULL,
"Load-Balance-Hosts", "", 8, /* sizeof("disable") = 8 */
@@ -1847,6 +1851,28 @@ pqConnectOptions2(PGconn *conn)
goto oom_error;
}
+ /*
+ * validate wait_for_lsn option
+ */
+ if (conn->wait_for_lsn_setting)
+ {
+ if (strcmp(conn->wait_for_lsn_setting, "on") == 0 ||
+ strcmp(conn->wait_for_lsn_setting, "true") == 0 ||
+ strcmp(conn->wait_for_lsn_setting, "1") == 0)
+ conn->wait_for_lsn_enabled = true;
+ else if (strcmp(conn->wait_for_lsn_setting, "off") == 0 ||
+ strcmp(conn->wait_for_lsn_setting, "false") ==
0 ||
+ strcmp(conn->wait_for_lsn_setting, "0") == 0)
+ conn->wait_for_lsn_enabled = false;
+ else
+ {
+ conn->status = CONNECTION_BAD;
+ libpq_append_conn_error(conn, "invalid %s value:
\"%s\"",
+
"wait_for_lsn", conn->wait_for_lsn_setting);
+ return false;
+ }
+ }
+
/*
* Only if we get this far is it appropriate to try to connect. (We
need a
* state flag, rather than just the boolean result of this function, in
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 0d224a8524e..8d2d183476b 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1454,6 +1454,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool
newQuery)
/* construct the outgoing Query message */
if (pqPutMsgStart(PqMsg_Query, conn) < 0 ||
pqPuts(query, conn) < 0 ||
+ (conn->wait_for_lsn_enabled && pqPuts(conn->last_lsn, conn)) ||
pqPutMsgEnd(conn) < 0)
{
/* error message should be set up already */
diff --git a/src/interfaces/libpq/fe-protocol3.c
b/src/interfaces/libpq/fe-protocol3.c
index 8c5ac1729f0..689c869ce56 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1625,6 +1625,23 @@ getReadyForQuery(PGconn *conn)
break;
}
+ if (conn->wait_for_lsn_enabled)
+ {
+ PQExpBufferData buf;
+
+ initPQExpBuffer(&buf);
+ if (pqGets(&buf, conn))
+ {
+ termPQExpBuffer(&buf);
+ return EOF;
+ }
+ else
+ {
+ strlcpy(conn->last_lsn, buf.data, sizeof
conn->last_lsn);
+ termPQExpBuffer(&buf);
+ }
+ }
+
return 0;
}
@@ -2298,6 +2315,9 @@ build_startup_packet(const PGconn *conn, char *packet,
if (conn->client_encoding_initial && conn->client_encoding_initial[0])
ADD_STARTUP_OPTION("client_encoding",
conn->client_encoding_initial);
+ if (conn->wait_for_lsn_enabled)
+ ADD_STARTUP_OPTION("_pq_.wait_for_lsn", "1");
+
/* Add any environment-driven GUC settings needed */
for (next_eo = options; next_eo->envName; next_eo++)
{
diff --git a/src/interfaces/libpq/fe-trace.c b/src/interfaces/libpq/fe-trace.c
index 19c5b8a8900..30217b687af 100644
--- a/src/interfaces/libpq/fe-trace.c
+++ b/src/interfaces/libpq/fe-trace.c
@@ -478,6 +478,7 @@ pqTraceOutput_Query(FILE *f, const char *message, int
*cursor)
{
fprintf(f, "Query\t");
pqTraceOutputString(f, message, cursor, false);
+ /* FIXME */
}
static void
@@ -609,6 +610,7 @@ pqTraceOutput_ReadyForQuery(FILE *f, const char *message,
int *cursor)
{
fprintf(f, "ReadyForQuery\t");
pqTraceOutputByte1(f, message, cursor);
+ /* FIXME */
}
/*
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 9579f803538..71b9ad24d53 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -426,6 +426,7 @@ struct pg_conn
char *ssl_max_protocol_version; /* maximum TLS protocol version
*/
char *target_session_attrs; /* desired session properties */
char *require_auth; /* name of the expected auth method */
+ char *wait_for_lsn_setting;
char *load_balance_hosts; /* load balance over hosts */
bool cancelRequest; /* true if this connection is used to
send a
@@ -448,6 +449,7 @@ struct pg_conn
ConnStatusType status;
PGAsyncStatusType asyncStatus;
PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
+ char last_lsn[128];
char last_sqlstate[6]; /* last reported SQLSTATE */
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using
nonblock
@@ -529,6 +531,7 @@ struct pg_conn
PGVerbosity verbosity; /* error/notice message verbosity */
PGContextVisibility show_context; /* whether to show CONTEXT
field */
PGlobjfuncs *lobjfuncs; /* private state for large-object
access fns */
+ bool wait_for_lsn_enabled;
pg_prng_state prng_state; /* prng state for load balancing
connections */
--
2.47.0