Heikki Linnakangas wrote:
> Magnus Hagander wrote:
>> 2010/1/17 Heikki Linnakangas <[email protected]>:
>>> We could replace the blocking PQexec() calls with PQsendQuery(), and use
>>> the emulated version of select() to wait.
>> Hmm. That would at least theoretically work, but aren't there still
>> places we may end up blocking further down? Or are those ok?
>
> There's also PQconnect that needs similar treatment (using
> PQconnectStart/Poll()), but that's it.
So here's a patch implementing that for contrib/dblink. Walreceiver
needs the same treatment.
The implementation should be shared between the two, but I'm not sure
how. We can't just put the wrapper functions to a module in
src/backend/port/, because the wrapper functions depend on libpq. Maybe
put them in a new header file as static functions, and include that in
contrib/dblink/dblink.c and src/backend/replication/libpqwalreceiver.c.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 2c1d7a2..fa11709 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -34,6 +34,14 @@
#include <limits.h>
+#ifdef WIN23
+/* These are needed by the interruptible libpq function replacements */
+#include <time.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#endif
+
#include "libpq-fe.h"
#include "fmgr.h"
#include "funcapi.h"
@@ -101,6 +109,193 @@ static void dblink_res_error(const char *conname, PGresult *res, const char *dbl
static char *get_connect_string(const char *servername);
static char *escape_param_str(const char *from);
+#ifdef WIN23
+/*
+ * Replacement functions for blocking libpq functions, for Windows.
+ *
+ * On Windows, the vanilla select() function doesn't react to our emulated
+ * signals. PQexec() and PQconnectdb() use select(), so they're
+ * uninterruptible. These replacement functions use the corresponding
+ * asynchronous libpq functions and backend version of select() to implement
+ * the same functionality, but in a way that's interrupted by signals.
+ *
+ * These work on other platforms as well, but presumably it's more efficient
+ * to let libpq block.
+ */
+
+static PGresult *
+dblink_PQexec(PGconn *conn, const char *command)
+{
+ int sock;
+ PGresult *result,
+ *lastResult;
+
+ /* Send query. This can block too, but we ignore that for now. */
+ if (PQsendQuery(conn, command) == 0)
+ return NULL;
+
+ /* Wait for response */
+ sock = PQsocket(conn);
+
+ while(PQisBusy(conn))
+ {
+ fd_set input_mask;
+
+ FD_ZERO(&input_mask);
+
+ FD_SET (sock, &input_mask);
+
+ /*
+ * Note that we don't check the return code. We assume that
+ * PQconsumeInput() will get the same error, and set the result
+ * as failed.
+ */
+ select(sock + 1, &input_mask, NULL, NULL, NULL);
+ PQconsumeInput(conn);
+ }
+
+ /*
+ * Emulate PQexec()'s behavior of returning the *last* result, if
+ * there's many. dblink doesn't normally issue statements that return
+ * multiple results, but the user-supplied SQL statement passed to
+ * dblink() might. You'll only get the last result back, so it's not a
+ * very sensible thing to do, but we must still handle it gracefully.
+ *
+ * We don't try to concatenate error messages like PQexec() does.
+ * Doesn't seem worth the effort.
+ */
+ lastResult = NULL;
+ while((result = PQgetResult(conn)) != NULL)
+ {
+ if (lastResult != NULL)
+ {
+ if (PQresultStatus(lastResult) != PGRES_COMMAND_OK &&
+ PQresultStatus(lastResult) != PGRES_TUPLES_OK)
+ {
+ PQclear(result);
+ result = lastResult;
+ }
+ else
+ PQclear(lastResult);
+ }
+ lastResult = result;
+ }
+
+ return lastResult;
+}
+
+static PGconn *
+dblink_PQconnectdb(const char *conninfo)
+{
+ PGconn *conn;
+ PostgresPollingStatusType status;
+ PQconninfoOption *options;
+ int timeout_secs = 0;
+ time_t end_time;
+ int sock;
+
+ conn = PQconnectStart(conninfo);
+ if (conn == NULL)
+ return NULL;
+
+ if (PQstatus(conn) == CONNECTION_BAD)
+ return conn;
+
+ /* Extract timeout from the connection string */
+ options = PQconninfoParse(conninfo, NULL);
+ if (options)
+ {
+ PQconninfoOption *option;
+ for (option = options; option->keyword != NULL; option++)
+ {
+ if (strcmp(option->keyword, "connect_timeout") == 0)
+ {
+ if (option->val != NULL && option->val[0] != '\0')
+ {
+ timeout_secs = atoi(option->val);
+ break;
+ }
+ }
+ }
+ PQconninfoFree(options);
+ }
+ if (timeout_secs > 0)
+ end_time = time(NULL) + timeout_secs;
+
+ sock = PQsocket(conn);
+
+ /* Wait for connection to be established */
+ for (;;)
+ {
+ fd_set input_mask;
+ fd_set output_mask;
+ time_t now;
+ struct timeval timeout;
+ struct timeval *timeout_ptr;
+
+ FD_ZERO(&input_mask);
+ FD_ZERO(&output_mask);
+
+ status = PQconnectPoll(conn);
+ switch(status)
+ {
+ case PGRES_POLLING_OK:
+ case PGRES_POLLING_FAILED:
+ return conn;
+
+ case PGRES_POLLING_READING:
+ FD_SET(sock, &input_mask);
+ break;
+
+ case PGRES_POLLING_WRITING:
+ FD_SET(sock, &output_mask);
+ break;
+
+ default:
+ elog(ERROR, "unknown PQconnectPoll() return value: %d", status);
+ }
+
+ if (timeout_secs > 0)
+ {
+ now = time(NULL);
+ timeout.tv_sec = (now > end_time) ? 0 : (end_time - now);
+ timeout.tv_usec = 0;
+ timeout_ptr = &timeout;
+ }
+ else
+ timeout_ptr = NULL;
+
+ /*
+ * Note that we don't check an error code. We assume that
+ * PQconnectPoll() will get the same error, and return failure.
+ */
+ if (select(sock + 1, &input_mask, &output_mask, NULL, timeout_ptr) == 0)
+ {
+ /* Timeout */
+ PQfinish(conn);
+
+ /*
+ * This message is subtly different from the one from the message
+ * you get on other platforms, where PQconnectdb() handles the
+ * timeout. The "timeout expired" message here gets translated
+ * using the backend .po file, while the message emitted by
+ * PQconnectdb() is translated using libpq .po file. I hope that
+ * makes no difference in practice.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not establish connection"),
+ errdetail("timeout expired")));
+ }
+ }
+ return NULL; /* not reached, keep compiler quiet */
+}
+
+#define PQexec(conn, command) dblink_PQexec(conn, command)
+#define PQconnectdb(conninfo) dblink_PQconnectdb(conninfo)
+
+#endif
+
/* Global */
static remoteConn *pconn = NULL;
static HTAB *remoteConnHash = NULL;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers