Thank you for the comments.
At Mon, 17 Jul 2017 16:09:04 -0400, Tom Lane <[email protected]> wrote in
<[email protected]>
> Kyotaro HORIGUCHI <[email protected]> writes:
> > This is the revased and revised version of the previous patch.
>
> A few more comments:
>
> * Per the spec for CacheRegisterSyscacheCallback, a zero hash value means
> to flush all associated state. This isn't handling that case correctly.
Right, fixed.
> Even when you are given a specific hash value, I think exiting after
> finding one match is incorrect: there could be multiple connections
> to the same server with different user mappings, or vice versa.
Sure. I'm confused that key hash value nails an entry in "the
connection cache". Thank you for pointing out that.
> * I'm not really sure that it's worth the trouble to pay attention
> to the hash value at all. Very few other syscache callbacks do,
> and the pg_server/pg_user_mapping catalogs don't seem likely to
> have higher than average traffic.
Agreed to the points. But there is another point that reconection
is far intensive than re-looking up of a system catalog or maybe
even than replanning. For now I choosed to avoid a possibility of
causing massive number of simultaneous reconnection.
> * Personally I'd be inclined to register the callbacks at the same
> time the hashtables are created, which is a pattern we use elsewhere
> ... in fact, postgres_fdw itself does it that way for the transaction
> related callbacks, so it seems a bit weird that you are going in a
> different direction for these callbacks. That way avoids the need to
> depend on a _PG_init function and means that the callbacks don't have to
> worry about the hashtables not being valid.
Sure, seems more reasonable than it is now. Changed the way of
registring a callback in the attached.
> Also, it seems a bit
> pointless to have separate layers of postgresMappingSysCallback and
> InvalidateConnectionForMapping functions.
It used to be one function but it seemed a bit wierd that the
function is called from two places (two catalogs) then branchs
according to the caller. I don't have a firm opinion on this so
changed.
As the result the changes in postgres_fdw.c has been disappeard.
> * How about some regression test cases? You couldn't really exercise
> cross-session invalidation easily, but I don't think you need to.
Ha Ha. You got me. I will add some test cases for this in the
next version. Thanks.
Ashutosh, I'll register this to the next CF after providing a
regression, thanks.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ----
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
+ #include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 54,62 ----
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
+ bool invalidated; /* true if reconnect is requried */
+ uint32 server_hashvalue; /* hash value of foreign server oid */
+ uint32 mapping_hashvalue; /* hash value of user mapping oid */
} ConnCacheEntry;
/*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 73,79 ----
/* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+ static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values);
static void configure_remote_session(PGconn *conn);
static void do_sql_command(PGconn *conn, const char *sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 83,89 ----
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg);
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 136,145 ----
*/
RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+ CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+ pgfdw_inval_callback, (Datum)0);
+ CacheRegisterSyscacheCallback(USERMAPPINGOID,
+ pgfdw_inval_callback, (Datum)0);
}
/* Set flag that we did GetConnection during the current transaction */
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
! /* initialize new hashtable entry (key is already filled in) */
entry->conn = NULL;
- entry->xact_depth = 0;
- entry->have_prep_stmt = false;
- entry->have_error = false;
- entry->changing_xact_state = false;
}
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
/*
* We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the
--- 154,182 ----
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
! /*
! * key is already filled in, flags well be initialized at the time of
! * making a new connection, so just clear conn here.
! */
entry->conn = NULL;
}
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
+
+ /*
+ * This connection is no longer valid. Disconnect such connections if no
+ * transaction is running.
+ */
+ if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+ {
+ /* reconneced immediately, so the messages is "reconnecting" */
+ elog(DEBUG3, "closing connection %p for option changes to take effect",
+ entry->conn);
+ disconnect_pg_server(entry);
+ }
+
/*
* We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 195,206 ----
entry->xact_depth = 0; /* just to be sure */
entry->have_prep_stmt = false;
entry->have_error = false;
+ entry->invalidated = false;
+ entry->changing_xact_state = false;
+ entry->server_hashvalue =
+ GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+ entry->mapping_hashvalue =
+ GetSysCacheHashValue1(USERMAPPINGOID, user->umid);
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 304,321 ----
return conn;
}
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+ if (entry->conn != NULL)
+ {
+ PQfinish(entry->conn);
+ entry->conn = NULL;
+ }
+ }
+
+
/*
* For non-superusers, insist that the connstr specify a password. This
* prevents a password from being picked up from .pgpass, a service file,
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 469,519 ----
}
/*
+ * Connection invalidation callback function
+ *
+ * Mark the connections to be disconnected if it depends on a foreign server
+ * or user mapping any options on which have been modified.
+ *
+ * Although most invalidate callbacks blow away all the related stuff
+ * regardless of the given hashvalue, if we blow away all existing connection
+ * from this server, it can cause a massive number of simultaneous
+ * reconnection to all the remotes. We restrict the connection to break to
+ * avoid such a mess.
+ *
+ * NB: We could avoid unnecessary disconnection more strictly by examining
+ * individual option values but it would be too-much for the gain.
+ */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ if (!ConnectionHash)
+ return;
+
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->conn == NULL)
+ continue;
+
+ if (hashvalue == 0)
+ entry->invalidated = true;
+ else
+ {
+ Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+
+ if ((cacheid == FOREIGNSERVEROID &&
+ entry->server_hashvalue == hashvalue) ||
+ (cacheid == USERMAPPINGOID &&
+ entry->mapping_hashvalue == hashvalue))
+ entry->invalidated = true;
+ }
+ }
+ }
+
+ /*
* Assign a "unique" number for a cursor.
*
* These really only need to be unique per connection within a transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)
entry->changing_xact_state)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
! PQfinish(entry->conn);
! entry->conn = NULL;
! entry->changing_xact_state = false;
}
}
--- 862,868 ----
entry->changing_xact_state)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
! disconnect_pg_server(entry);
}
}
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Form_pg_user_mapping umform;
ForeignServer *server;
! if (!entry->changing_xact_state)
return;
tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key));
if (!HeapTupleIsValid(tup))
--- 996,1009 ----
Form_pg_user_mapping umform;
ForeignServer *server;
! /* nothing to do for inactive entries and entries of sane state */
! if (entry->conn ==NULL || !entry->changing_xact_state)
return;
+ /* make sure this entry is inactive */
+ disconnect_pg_server(entry);
+
+ /* find server name to be shown in the message below */
tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key));
if (!HeapTupleIsValid(tup))
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers