On 2021/01/18 12:33, Bharath Rupireddy wrote:
On Sun, Jan 17, 2021 at 11:30 PM Zhihong Yu <z...@yugabyte.com> wrote:
This patch introduces new function postgres_fdw_disconnect() when
called with a foreign server name discards the associated
connections with the server name.

I think the following would read better:

This patch introduces a new function postgres_fdw_disconnect(). When
called with a foreign server name, it discards the associated
connections with the server.

Thanks. I corrected the commit message.

Please note the removal of the 'name' at the end - connection is with server, 
not server name.

+       if (is_in_use)
+           ereport(WARNING,
+                   (errmsg("cannot close the connection because it is still in 
use")));

It would be better to include servername in the message.

User would have provided the servername in
postgres_fdw_disconnect('myserver'), I don't think we need to emit the
warning again with the servername. The existing warning seems fine.

+               ereport(WARNING,
+                       (errmsg("cannot close all connections because some of them 
are still in use")));

I think showing the number of active connections would be more informative.
This can be achieved by changing active_conn_exists from bool to int (named 
active_conns, e.g.):

+       if (entry->conn && !active_conn_exists)
+           active_conn_exists = true;

Instead of setting the bool value, active_conns can be incremented.

IMO, the number of active connections is not informative, because
users can not do anything with them. What's actually more informative
would be to list all the server names for which the connections are
active, instead of the warning - "cannot close all connections because
some of them are still in use". Having said that, I feel like it's an
overkill for now to do that. If required, we can enhance the warnings
in future. Thoughts?

Attaching v11 patch set, with changes only in 0001. The changes are
commit message correction and moved the warning related code to
disconnect_cached_connections from postgres_fdw_disconnect.

Please review v11 further.

Thanks for updating the patch!

The patch for postgres_fdw_get_connections() basically looks good to me.
So at first I'd like to push it. Attached is the patch that I extracted
postgres_fdw_get_connections() part from 0001 patch and tweaked.
Thought?

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index ee8a80a392..c1b0cad453 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK_INTERNAL = $(libpq)
 
 EXTENSION = postgres_fdw
-DATA = postgres_fdw--1.0.sql
+DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
 
 REGRESS = postgres_fdw
 
diff --git a/contrib/postgres_fdw/connection.c 
b/contrib/postgres_fdw/connection.c
index eaedfea9f2..a1404cb6bb 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -16,12 +16,14 @@
 #include "access/xact.h"
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
+#include "funcapi.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postgres_fdw.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/builtins.h"
 #include "utils/datetime.h"
 #include "utils/hsearch.h"
 #include "utils/inval.h"
@@ -74,6 +76,11 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/*
+ * SQL functions
+ */
+PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
+
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
@@ -1335,3 +1342,131 @@ exit:   ;
                *result = last_res;
        return timed_out;
 }
+
+/*
+ * List active foreign server connections.
+ *
+ * This function takes no input parameter and returns setof record made of
+ * following values:
+ * - server_name - server name of active connection. In case the foreign server
+ *   is dropped but still the connection is active, then the server name will
+ *   be NULL in output.
+ * - valid - true/false representing whether the connection is valid or not.
+ *      Note that the connections can get invalidated in pgfdw_inval_callback.
+ *
+ * No records are returned when there are no cached connections at all.
+ */
+Datum
+postgres_fdw_get_connections(PG_FUNCTION_ARGS)
+{
+#define POSTGRES_FDW_GET_CONNECTIONS_COLS      2
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       HASH_SEQ_STATUS scan;
+       ConnCacheEntry *entry;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context 
that cannot accept a set")));
+       if (!(rsinfo->allowedModes & SFRM_Materialize))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is 
not allowed in this context")));
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       /* Build tuplestore to hold the result rows */
+       per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->returnMode = SFRM_Materialize;
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       /* If cache doesn't exist, we return no records */
+       if (!ConnectionHash)
+       {
+               /* clean up and return the tuplestore */
+               tuplestore_donestoring(tupstore);
+
+               PG_RETURN_VOID();
+       }
+
+       hash_seq_init(&scan, ConnectionHash);
+       while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+       {
+               ForeignServer *server;
+               Datum           values[POSTGRES_FDW_GET_CONNECTIONS_COLS];
+               bool            nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS];
+
+               /* We only look for open remote connections */
+               if (!entry->conn)
+                       continue;
+
+               server = GetForeignServerExtended(entry->serverid, 
FSV_MISSING_OK);
+
+               MemSet(values, 0, sizeof(values));
+               MemSet(nulls, 0, sizeof(nulls));
+
+               /*
+                * The foreign server may have been dropped in current explicit
+                * transaction. It is not possible to drop the server from 
another
+                * session when the connection associated with it is in use in 
the
+                * current transaction, if tried so, the drop query in another 
session
+                * blocks until the current transaction finishes.
+                *
+                * Even though the server is dropped in the current 
transaction, the
+                * cache can still have associated active connection entry, say 
we
+                * call such connections dangling. Since we can not fetch the 
server
+                * name from system catalogs for dangling connections, instead 
we
+                * show NULL value for server name in output.
+                *
+                * We could have done better by storing the server name in the 
cache
+                * entry instead of server oid so that it could be used in the 
output.
+                * But the server name in each cache entry requires 64 bytes of
+                * memory, which is huge, when there are many cached 
connections and
+                * the use case i.e. dropping the foreign server within the 
explicit
+                * current transaction seems rare. So, we chose to show NULL 
value for
+                * server name in output.
+                *
+                * Such dangling connections get closed either in next use or 
at the
+                * end of current explicit transaction in pgfdw_xact_callback.
+                */
+               if (!server)
+               {
+                       /*
+                        * If the server has been dropped in the current 
explicit
+                        * transaction, then this entry would have been 
invalidated in
+                        * pgfdw_inval_callback at the end of drop sever 
command. Note
+                        * that this connection would not have been closed in
+                        * pgfdw_inval_callback because it is still being used 
in the
+                        * current explicit transaction. So, assert that here.
+                        */
+                       Assert(entry->conn && entry->xact_depth > 0 && 
entry->invalidated);
+
+                       /* Show null, if no server name was found */
+                       nulls[0] = true;
+               }
+               else
+                       values[0] = CStringGetTextDatum(server->servername);
+
+               values[1] = BoolGetDatum(!entry->invalidated);
+
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       }
+
+       /* clean up and return the tuplestore */
+       tuplestore_donestoring(tupstore);
+
+       PG_RETURN_VOID();
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..418addcc4c 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -13,12 +13,18 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+        EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+
     END;
 $d$;
 CREATE USER MAPPING FOR public SERVER testserver1
        OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3;
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -129,6 +135,11 @@ CREATE FOREIGN TABLE ft6 (
        c2 int NOT NULL,
        c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft7 (
+       c1 int NOT NULL,
+       c2 int NOT NULL,
+       c3 text
+) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -199,7 +210,8 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS 
(column_name 'C 1');
  public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3') | 
  public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4') | 
  public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4') | 
-(5 rows)
+ public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4') | 
+(6 rows)
 
 -- Test that alteration of server options causes reconnection
 -- Remote's errors might be non-English, so hide them to ensure stable results
@@ -9040,6 +9052,13 @@ DROP PROCEDURE terminate_backend_and_wait(text);
 -- ===================================================================
 -- This test case is for closing the connection in pgfdw_xact_callback
 BEGIN;
+-- List all the existing cached connections. Only loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback2   | t
+(1 row)
+
 -- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
 SELECT 1 FROM ft1 LIMIT 1;
  ?column? 
@@ -9047,9 +9066,49 @@ SELECT 1 FROM ft1 LIMIT 1;
         1
 (1 row)
 
+SELECT 1 FROM ft7 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. loopback and loopback3
+-- also should be output as valid connections.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+ loopback2   | t
+ loopback3   | t
+(3 rows)
+
 -- Connection is not closed at the end of the alter statement in
 -- pgfdw_inval_callback. That's because the connection is in midst of this
 -- xact, it is just marked as invalid.
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
+DROP SERVER loopback3 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DETAIL:  drop cascades to user mapping for postgres on server loopback3
+drop cascades to foreign table ft7
+-- List all the existing cached connections. loopback and loopback3
+-- should be output as invalid connections. Also the server name for
+-- loopback3 should be NULL because the server was dropped.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | f
+ loopback2   | t
+             | f
+(3 rows)
+
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- List all the existing cached connections. loopback and loopback3
+-- should not be output because they should be closed at the end of
+-- the above transaction.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback2   | t
+(1 row)
+
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql 
b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
new file mode 100644
index 0000000000..7f85784466
--- /dev/null
+++ b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
@@ -0,0 +1,10 @@
+/* contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.1'" to load this file. 
\quit
+
+CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
+    OUT valid boolean)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
diff --git a/contrib/postgres_fdw/postgres_fdw.control 
b/contrib/postgres_fdw/postgres_fdw.control
index f9ed490752..d489382064 100644
--- a/contrib/postgres_fdw/postgres_fdw.control
+++ b/contrib/postgres_fdw/postgres_fdw.control
@@ -1,5 +1,5 @@
 # postgres_fdw extension
 comment = 'foreign-data wrapper for remote PostgreSQL servers'
-default_version = '1.0'
+default_version = '1.1'
 module_pathname = '$libdir/postgres_fdw'
 relocatable = true
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql 
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..c1a7f57222 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -15,6 +15,11 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+        EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+
     END;
 $d$;
 
@@ -22,6 +27,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
        OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3;
 
 -- ===================================================================
 -- create objects used through FDW loopback server
@@ -142,6 +148,12 @@ CREATE FOREIGN TABLE ft6 (
        c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE ft7 (
+       c1 int NOT NULL,
+       c2 int NOT NULL,
+       c3 text
+) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -2703,11 +2715,26 @@ DROP PROCEDURE terminate_backend_and_wait(text);
 -- ===================================================================
 -- This test case is for closing the connection in pgfdw_xact_callback
 BEGIN;
+-- List all the existing cached connections. Only loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
 -- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
 SELECT 1 FROM ft1 LIMIT 1;
+SELECT 1 FROM ft7 LIMIT 1;
+-- List all the existing cached connections. loopback and loopback3
+-- also should be output as valid connections.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
 -- Connection is not closed at the end of the alter statement in
 -- pgfdw_inval_callback. That's because the connection is in midst of this
 -- xact, it is just marked as invalid.
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
+DROP SERVER loopback3 CASCADE;
+-- List all the existing cached connections. loopback and loopback3
+-- should be output as invalid connections. Also the server name for
+-- loopback3 should be NULL because the server was dropped.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- List all the existing cached connections. loopback and loopback3
+-- should not be output because they should be closed at the end of
+-- the above transaction.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index e6fd2143c1..6a91926da8 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -479,6 +479,38 @@ OPTIONS (ADD password_required 'false');
   </sect3>
  </sect2>
 
+<sect2>
+  <title>Functions</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><function>postgres_fdw_get_connections(OUT server_name text, OUT 
valid boolean) returns setof record</function></term>
+    <listitem>
+     <para>
+      This function returns the foreign server names of all the open
+      connections that <filename>postgres_fdw</filename> established from
+      the local session to the foreign servers. It also returns whether
+      each connection is valid or not. <literal>false</literal> is returned
+      if the foreign server connection is used in the current local
+      transaction but its foreign server or user mapping is changed or
+      dropped, and then such invalid connection will be closed at
+      the end of that transaction. <literal>true</literal> is returned
+      otherwise. If there are no open connections, no record is returned.
+      Example usage of the function:
+    <screen>
+postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback1   | t
+ loopback2   | f
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+   </variablelist>
+
+</sect2>
+
  <sect2>
   <title>Connection Management</title>
 

Reply via email to