Hi All,
While looking at the patch for supporting inheritance on foreign tables, I
noticed that if a transaction makes changes to more than two foreign
servers the current implementation in postgres_fdw doesn't make sure that
either all of them rollback or all of them commit their changes, IOW there
is a possibility that some of them commit their changes while others
rollback theirs.

PFA patch which uses 2PC to solve this problem. In pgfdw_xact_callback() at
XACT_EVENT_PRE_COMMIT event, it sends prepares the transaction at all the
foreign postgresql servers and at XACT_EVENT_COMMIT or XACT_EVENT_ABORT
event it commits or aborts those transactions resp.

The logic to craft the prepared transaction ids is rudimentary and I am
open to suggestions for the same. I have following goals in mind while
crafting the transaction ids
1. Minimize the chances of crafting a transaction id which would conflict
with a concurrent transaction id on that foreign server.
2. Because of a limitation described later, DBA/user should be able to
identify the server which originated a remote transaction.
More can be found in comments above function pgfdw_get_prep_xact_id() in
the patch.

Limitations
---------------
1. After a transaction has been prepared on foreign server, if the
connection to that server is lost before the transaction is rolled back or
committed on that server, the transaction remains in prepared state
forever. Manual intervention would be needed to clean up such a transaction
(Hence the goal 2 above). Automating this process would require significant
changes to the transaction manager, so, left out of this patch, which I
thought would be better right now. If required, I can work on that part in
this patch itself.

2. 2PC is needed only when there are more than two foreign servers involved
in a transaction. Transactions on a single foreign server are handled well
right now. So, ideally, the code should detect if there are more than two
foreign server are involved in the transaction and only then use 2PC. But I
couldn't find a way to detect that without changing the transaction manager.
-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..9492f14 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -42,20 +42,21 @@ typedef struct ConnCacheKey
 } ConnCacheKey;
 
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
 	PGconn	   *conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
 	bool		have_error;		/* have any subxacts aborted in this xact? */
+	char		*prep_xact_name;	/* Name of prepared transaction on this connection */
 } ConnCacheEntry;
 
 /*
  * Connection cache (initialized on first use)
  */
 static HTAB *ConnectionHash = NULL;
 
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -135,20 +136,21 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	 * Find or create cached entry for requested connection.
 	 */
 	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
 	if (!found)
 	{
 		/* initialize new hashtable entry (key is already filled in) */
 		entry->conn = NULL;
 		entry->xact_depth = 0;
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->prep_xact_name = NULL;
 	}
 
 	/*
 	 * We don't check the health of cached connection here, because it would
 	 * require some overhead.  Broken connection will be detected when the
 	 * connection is actually used.
 	 */
 
 	/*
 	 * If cache entry doesn't have a connection, we have to establish a new
@@ -156,20 +158,21 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	 * will be left in a valid empty state.)
 	 */
 	if (entry->conn == NULL)
 	{
 		entry->xact_depth = 0;	/* just to be sure */
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
 			 entry->conn, server->servername);
+		entry->prep_xact_name = NULL;
 	}
 
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
 	begin_remote_xact(entry);
 
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
@@ -507,20 +510,55 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		if (clear)
 			PQclear(res);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 	if (clear)
 		PQclear(res);
 }
 
 /*
+ * pgfdw_get_prep_xact_id
+ * The function crafts prepared transaction identifier. PostgreSQL documentation
+ * mentions two restrictions on the name
+ * 1. String literal, less than 200 bytes long.
+ * 2. Should not be same as any other concurrent prepared transaction id.
+ *
+ * The name should give enough hints to the DBA/user, in case a manual
+ * intervention is required as per the comment in its caller.
+ *
+ * To make the prepared transaction id, we should ideally use something like
+ * UUID, which gives unique ids with high probability, but that may be expensive
+ * here and UUID extension which provides the function to generate UUID is part
+ * of the extension not the core.
+ *
+ * In order to make it easy for the DBA/user to identify the originating server
+ * (so that he can verify the status of the originating transaction), we should
+ * let the DBA/user configure a prefix to be used for prepared transaction ids,
+ * but that requires changes to the core, so left out of the work. Instead we
+ * use the serverid and userid, which help in creating unique ids as well give
+ * hints (albeit weak) to the originating transaction.
+ */
+static char *
+pgfdw_get_prep_xact_id(ConnCacheEntry *entry)
+{
+/* Maximum length of the prepared transaction id, borrowed from twophase.c */
+#define PREP_XACT_ID_MAX_LEN 200
+#define RANDOM_LARGE_MULTIPLIER 1000
+	char	*prep_xact_name = (char *)palloc(PREP_XACT_ID_MAX_LEN * sizeof(char));
+
+	snprintf(prep_xact_name, PREP_XACT_ID_MAX_LEN, "%s_%4d_%d_%d",
+								"px", abs(random() * RANDOM_LARGE_MULTIPLIER),
+								entry->key.serverid, entry->key.userid);
+	return prep_xact_name;
+}
+/*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
 static void
 pgfdw_xact_callback(XactEvent event, void *arg)
 {
 	HASH_SEQ_STATUS scan;
 	ConnCacheEntry *entry;
 
 	/* Quick exit if no connections were touched in this transaction. */
 	if (!xact_got_connection)
@@ -540,24 +578,48 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 			continue;
 
 		/* If it has an open remote transaction, try to close it */
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
 				 entry->conn);
 
 			switch (event)
 			{
+				char		*prep_xact_name;
+				StringInfo	command;
 				case XACT_EVENT_PRE_COMMIT:
-					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
+					/* 
+					 * Prepare the transaction on the remote nodes, to check the
+					 * subsequent COMMIT would go through or not.
+					 * TODO:
+					 * We also don't need prepared transactions involving a
+					 * single foreign server. How do we know that the current
+					 * transaction involved only a single foreign server?
+					 */
 
+					prep_xact_name = pgfdw_get_prep_xact_id(entry);
+					command = makeStringInfo();
+					appendStringInfo(command, "PREPARE TRANSACTION '%s'", prep_xact_name);
+					do_sql_command(entry->conn, command->data);
+					/* The transaction got prepared, register this fact */
+					entry->prep_xact_name = prep_xact_name;
+					/*
+					 * XXX:
+					 * After this, if the server crashes or looses connection to
+					 * the foreign server, before COMMIT/ABORT message is sent
+					 * to the foreign server, the transaction prepared above would
+					 * remain in that state till DBA/user manually commits or
+					 * rolls it back. The automation of this step would require
+					 * changes in the core transaction manager and left out for
+					 * now.
+					 */
 					/*
 					 * If there were any errors in subtransactions, and we
 					 * made prepared statements, do a DEALLOCATE ALL to make
 					 * sure we get rid of all prepared statements. This is
 					 * annoying and not terribly bulletproof, but it's
 					 * probably not worth trying harder.
 					 *
 					 * DEALLOCATE ALL only exists in 8.3 and later, so this
 					 * constrains how old a server postgres_fdw can
 					 * communicate with.  We intentionally ignore errors in
@@ -568,86 +630,115 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
 						res = PQexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
 					entry->have_error = false;
 					break;
 				case XACT_EVENT_PRE_PREPARE:
-
 					/*
 					 * We disallow remote transactions that modified anything,
 					 * since it's not very reasonable to hold them open until
 					 * the prepared transaction is committed.  For the moment,
 					 * throw error unconditionally; later we might allow
 					 * read-only cases.  Note that the error will cause us to
 					 * come right back here with event == XACT_EVENT_ABORT, so
 					 * we'll clean up the connection state at that point.
 					 */
 					ereport(ERROR,
 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 							 errmsg("cannot prepare a transaction that modified remote tables")));
 					break;
 				case XACT_EVENT_COMMIT:
+					/*
+					 * If we prepared a transaction on a foreign server, commit
+					 * it.
+					 */
+					if (entry->prep_xact_name)
+					{
+						command = makeStringInfo();
+						appendStringInfo(command, "COMMIT PREPARED '%s'",
+													entry->prep_xact_name);
+						do_sql_command(entry->conn, command->data);
+						entry->prep_xact_name = NULL;
+					}
+					else
+						/* Pre-commit should have closed the open transaction */
+						elog(ERROR, "missed cleaning up connection during pre-commit");
+
+					break;
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
 					break;
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					if (entry->prep_xact_name)
+					{
+						command = makeStringInfo();
+						appendStringInfo(command, "ROLLBACK PREPARED '%s'", entry->prep_xact_name);
+						res = PQexec(entry->conn, command->data);
+						pfree(entry->prep_xact_name);
+						entry->prep_xact_name = NULL;
+					}
+					else
+						res = PQexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
 										   "ABORT TRANSACTION");
 					else
 					{
 						PQclear(res);
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
 							res = PQexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
 						entry->have_error = false;
 					}
 					break;
 			}
 		}
 
-		/* Reset state to show we're out of a transaction */
-		entry->xact_depth = 0;
+		/*
+		 * If we have aborted or committed the transaction, reset state to show
+		 * we're out of a transaction.
+		 */
+		if (event != XACT_EVENT_PRE_COMMIT)  
+			entry->xact_depth = 0;
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
 		if (PQstatus(entry->conn) != CONNECTION_OK ||
 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
 			elog(DEBUG3, "discarding connection %p", entry->conn);
 			PQfinish(entry->conn);
 			entry->conn = NULL;
 		}
 	}
 
 	/*
-	 * Regardless of the event type, we can now mark ourselves as out of the
-	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
-	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
+	 * If we have aborted or committed the transaction, we can now mark ourselves
+	 * as out of the transaction.
 	 */
-	xact_got_connection = false;
+	if (event != XACT_EVENT_PRE_COMMIT)  
+		xact_got_connection = false;
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
 }
 
 /*
  * pgfdw_subxact_callback --- cleanup at subtransaction end.
  */
 static void
 pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index f7e11ed..e7e9bf7 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -3063,10 +3063,122 @@ ERROR:  type "public.Colors" does not exist
 LINE 4:   "Col" public."Colors" OPTIONS (column_name 'Col')
                 ^
 QUERY:  CREATE FOREIGN TABLE t5 (
   c1 integer OPTIONS (column_name 'c1'),
   c2 text OPTIONS (column_name 'c2') COLLATE pg_catalog."C",
   "Col" public."Colors" OPTIONS (column_name 'Col')
 ) SERVER loopback
 OPTIONS (schema_name 'import_source', table_name 't5');
 CONTEXT:  importing foreign table "t5"
 ROLLBACK;
+-- This will suppress the context of errors, which contains prepared transaction
+-- IDs. Those come out to be different each time.
+\set VERBOSITY terse
+-- Test transactional consistency for multiple server case
+-- create two loopback servers for testing consistency on two connections
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+    END;
+$d$;
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+    END;
+$d$;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback1;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+-- create a local table to refer to as foreign table. Add a row. The table has
+-- constraints which are deferred till end of transaction. This allows commit
+-- time errors occur by inserting data which violates constraints.
+CREATE TABLE lt(val int UNIQUE DEFERRABLE INITIALLY DEFERRED);
+INSERT INTO lt VALUES (1);
+INSERT INTO lt VALUES (3);
+-- create two foreign tables each on separate server referring to the local table.
+CREATE FOREIGN TABLE ft1_lt (val int) SERVER loopback1 OPTIONS (table_name 'lt');
+CREATE FOREIGN TABLE ft2_lt (val int) SERVER loopback2 OPTIONS (table_name 'lt');
+-- In a transaction insert two rows one each to the two foreign tables. One of
+-- the rows violates the constraint and other not. At the time of commit
+-- constraints on one of the server will rollback transaction on that server.
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1); -- Violates constraint
+	INSERT INTO ft2_lt VALUES (2);
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (3); -- Violates constraint
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- Transaction involving local changes and remote changes, one of them or both
+-- violating the constraints
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints
+	INSERT INTO ft1_lt VALUES (2);
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints 
+	INSERT INTO ft1_lt VALUES (3); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- Next test shows local transaction fails if "any" of the remote transactions
+-- fail to commit. But any COMMITted transaction on the remote servers remains
+-- COMMITTED.
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+DROP SERVER loopback1 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DROP SERVER loopback2 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DROP TABLE lt;
+\set VERBOSITY default
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index ae96684..f15d302 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -665,10 +665,95 @@ IMPORT FOREIGN SCHEMA nonesuch FROM SERVER nowhere INTO notthere;
 -- We can fake this by dropping the type locally in our transaction.
 CREATE TYPE "Colors" AS ENUM ('red', 'green', 'blue');
 CREATE TABLE import_source.t5 (c1 int, c2 text collate "C", "Col" "Colors");
 
 CREATE SCHEMA import_dest5;
 BEGIN;
 DROP TYPE "Colors" CASCADE;
 IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
   FROM SERVER loopback INTO import_dest5;  -- ERROR
 ROLLBACK;
+
+-- This will suppress the context of errors, which contains prepared transaction
+-- IDs. Those come out to be different each time.
+\set VERBOSITY terse
+-- Test transactional consistency for multiple server case
+-- create two loopback servers for testing consistency on two connections
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+    END;
+$d$;
+
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+    END;
+$d$;
+
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback1;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+
+-- create a local table to refer to as foreign table. Add a row. The table has
+-- constraints which are deferred till end of transaction. This allows commit
+-- time errors occur by inserting data which violates constraints.
+CREATE TABLE lt(val int UNIQUE DEFERRABLE INITIALLY DEFERRED);
+INSERT INTO lt VALUES (1);
+INSERT INTO lt VALUES (3);
+
+-- create two foreign tables each on separate server referring to the local table.
+CREATE FOREIGN TABLE ft1_lt (val int) SERVER loopback1 OPTIONS (table_name 'lt');
+CREATE FOREIGN TABLE ft2_lt (val int) SERVER loopback2 OPTIONS (table_name 'lt');
+
+-- In a transaction insert two rows one each to the two foreign tables. One of
+-- the rows violates the constraint and other not. At the time of commit
+-- constraints on one of the server will rollback transaction on that server.
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1); -- Violates constraint
+	INSERT INTO ft2_lt VALUES (2);
+COMMIT TRANSACTION;
+
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (3); -- Violates constraint
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- Transaction involving local changes and remote changes, one of them or both
+-- violating the constraints
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints
+	INSERT INTO ft1_lt VALUES (2);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints 
+	INSERT INTO ft1_lt VALUES (3); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+-- Next test shows local transaction fails if "any" of the remote transactions
+-- fail to commit. But any COMMITted transaction on the remote servers remains
+-- COMMITTED.
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+DROP SERVER loopback1 CASCADE;
+DROP SERVER loopback2 CASCADE;
+DROP TABLE lt;
+\set VERBOSITY default
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to