On 2020/10/11 9:16, Tom Lane wrote:
Fujii Masao <[email protected]> writes:
Therefore, the easy fix is to make libpq mark the connection as
CONNECTION_BAD even in ECONNABORTED, like we do in ECONNRESET.
So in the wake of commit fe27009cb,
Many thanks for the commit fe27009cb !!
this is what lorikeet is doing:
@@ -9028,9 +9028,7 @@
CALL terminate_backend_and_wait('fdw_retry_check');
SAVEPOINT s;
SELECT 1 FROM ft1 LIMIT 1; -- should fail
-ERROR: server closed the connection unexpectedly
- This probably means the server terminated abnormally
- before or while processing the request.
+ERROR: could not receive data from server: Software caused connection abort
CONTEXT: remote SQL command: SAVEPOINT s2
COMMIT;
-- Clean up
which is better --- the connection recovery is working --- but
obviously still not a "pass".
The only way to make this test pass as-is is to hack libpq so that
it deems ECONNABORTED to be a server crash, which frankly I do not
think is a good change. I don't see any principled reason to think
that it means that rather than a network connectivity failure.
What I've done instead as a stopgap is to adjust the test case to be
insensitive to the exact error message text.
For now I've not come up with better idea than current this fix.
Maybe there's a better
way, but I can't think of anything besides having two (or more?)
expected-output files. That would be quite unpalatable as things
stand, though perhaps we could make it tolerable by splitting this
test off into a second test script.
Meanwhile, now that I've looked at commit 32a9c0bdf, I'm not very
happy with it:
* The control flow seems rather forced. I think it was designed
on the assumption that reindenting the existing code is forbidden.
Why not use an actual loop, instead of a goto? I also think that
it's far from obvious that the loop isn't an infinite loop; it
took me quite a while to glom onto the fact that the test inside the
PG_CATCH avoids that by checking retry_conn. Maybe a comment would
be enough to improve that, but I feel the control logic could stand
a rethink.
Isn't it simpler and easier-to-read to just reestablish new connection again
in the retry case instead of using a loop because we retry only once?
Patch attached.
* The CATCH case is completely failing to clean up after itself.
At the minimum, there has to be a FlushErrorState() there.
I don't think it'd be terribly hard to drive this code to an
error-stack-overflow PANIC.
You're right. Sorry I forgot to take into consideration this :(
I fixed this issue in the attached patch.
* As is so often true of proposed patches in which PG_CATCH is
thought to be an adequate way to catch an error, this is just
unbelievably fragile. It will break, and badly so, if it catches
an error that is anything other than what it is expecting ...
and it's not even particularly trying to verify that the error is
what it's expecting. It might be okay, or at least a little bit
harder to break, if it verified that the error's SQLSTATE is
ERRCODE_CONNECTION_FAILURE.
"PQstatus()==CONNECTION_BAD" was checked for that purpose. But that's not
enough?
Anyway, in the patch, I changed the code so that
sqlstate==ERRCODE_CONNECTION_FAILURE
is checked.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/contrib/postgres_fdw/connection.c
b/contrib/postgres_fdw/connection.c
index 76994f3820..8bbad61c3b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -74,6 +74,7 @@ static unsigned int prep_stmt_number = 0;
static bool xact_got_connection = false;
/* prototypes of private functions */
+static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values,
UserMapping *user);
@@ -108,9 +109,10 @@ PGconn *
GetConnection(UserMapping *user, bool will_prep_stmt)
{
bool found;
- volatile bool retry_conn = false;
+ bool retry = false;
ConnCacheEntry *entry;
ConnCacheKey key;
+ MemoryContext ccxt = CurrentMemoryContext;
/* First time through, initialize connection cache hashtable */
if (ConnectionHash == NULL)
@@ -160,23 +162,14 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
-retry:
-
/*
* If the connection needs to be remade due to invalidation, disconnect
as
- * soon as we're out of all transactions. Also, if previous attempt to
- * start new remote transaction failed on the cached connection,
- * disconnect it to retry a new connection.
+ * soon as we're out of all transactions.
*/
- if ((entry->conn != NULL && entry->invalidated &&
- entry->xact_depth == 0) || retry_conn)
+ if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
{
- if (retry_conn)
- elog(DEBUG3, "closing connection %p to reestablish a
new one",
- entry->conn);
- else
- elog(DEBUG3, "closing connection %p for option changes
to take effect",
- entry->conn);
+ elog(DEBUG3, "closing connection %p for option changes to take
effect",
+ entry->conn);
disconnect_pg_server(entry);
}
@@ -186,58 +179,67 @@ retry:
* will remain in a valid empty state, ie conn == NULL.)
*/
if (entry->conn == NULL)
- {
- ForeignServer *server = GetForeignServer(user->serverid);
-
- /* Reset all transient state fields, to be sure all are clean */
- entry->xact_depth = 0;
- entry->have_prep_stmt = false;
- entry->have_error = false;
- entry->changing_xact_state = false;
- entry->invalidated = false;
- entry->server_hashvalue =
- GetSysCacheHashValue1(FOREIGNSERVEROID,
-
ObjectIdGetDatum(server->serverid));
- entry->mapping_hashvalue =
- GetSysCacheHashValue1(USERMAPPINGOID,
-
ObjectIdGetDatum(user->umid));
-
- /* Now try to make the connection */
- entry->conn = connect_pg_server(server, user);
-
- elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"
(user mapping oid %u, userid %u)",
- entry->conn, server->servername, user->umid,
user->userid);
- }
+ make_new_connection(entry, user);
/*
* We check the health of the cached connection here when starting a new
- * remote transaction. If a broken connection is detected in the first
- * attempt, we try to reestablish a new connection. If broken connection
- * is detected again here, we give up getting a connection.
+ * remote transaction. If a broken connection is detected, we try to
+ * reestablish a new connection later.
*/
PG_TRY();
{
/* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry);
- retry_conn = false;
}
PG_CATCH();
{
- if (PQstatus(entry->conn) != CONNECTION_BAD ||
- entry->xact_depth > 0 ||
- retry_conn)
+ MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
+ ErrorData *errdata = CopyErrorData();
+
+ /*
+ * The error code ERRCODE_CONNECTION_FAILURE indicates a broken
+ * connection. If it's detected when starting a new remote
transaction
+ * (not subtransaction), new connection will be reestablished
later.
+ */
+ if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
+ entry->xact_depth > 0)
+ {
+ MemoryContextSwitchTo(ecxt);
PG_RE_THROW();
- retry_conn = true;
+ }
+
+ /* Clean up the error state */
+ FlushErrorState();
+ FreeErrorData(errdata);
+ errdata = NULL;
+
+ retry = true;
}
PG_END_TRY();
- if (retry_conn)
+ /*
+ * If a broken connection is detected, disconnect it, reestablish a new
+ * connection and retry a new remote transaction. If connection failure
+ * is reported again, we give up getting a connection.
+ */
+ if (retry)
{
+ Assert(PQstatus(entry->conn) == CONNECTION_BAD);
+ Assert(entry->xact_depth == 0);
+
ereport(DEBUG3,
(errmsg_internal("could not start remote
transaction on connection %p",
entry->conn)),
errdetail_internal("%s",
pchomp(PQerrorMessage(entry->conn))));
- goto retry;
+
+ elog(DEBUG3, "closing connection %p to reestablish a new one",
+ entry->conn);
+ disconnect_pg_server(entry);
+
+ if (entry->conn == NULL)
+ make_new_connection(entry, user);
+
+ begin_remote_xact(entry);
}
/* Remember if caller will prepare statements */
@@ -246,6 +248,37 @@ retry:
return entry->conn;
}
+/*
+ * Reset all transient state fields in the cached connection entry and
+ * establish new connection to the remote server.
+ */
+static void
+make_new_connection(ConnCacheEntry *entry, UserMapping *user)
+{
+ ForeignServer *server = GetForeignServer(user->serverid);
+
+ Assert(entry->conn == NULL);
+
+ /* Reset all transient state fields, to be sure all are clean */
+ entry->xact_depth = 0;
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ entry->changing_xact_state = false;
+ entry->invalidated = false;
+ entry->server_hashvalue =
+ GetSysCacheHashValue1(FOREIGNSERVEROID,
+
ObjectIdGetDatum(server->serverid));
+ entry->mapping_hashvalue =
+ GetSysCacheHashValue1(USERMAPPINGOID,
+
ObjectIdGetDatum(user->umid));
+
+ /* Now try to make the connection */
+ entry->conn = connect_pg_server(server, user);
+
+ elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user
mapping oid %u, userid %u)",
+ entry->conn, server->servername, user->umid, user->userid);
+}
+
/*
* Connect to remote server using specified server and user mapping properties.
*/