I found bug in logical replication where extra (nullable) columns on
subscriber will be reset to NULL value when update comes from provider.

The issue is apparently that we /points finger at himself/ forgot to
check specifically for columns that are not part of attribute map in
slot_modify_cstrings() so the extra columns will fall through to the
else block which sets the value to NULL.

Attached patch fixes it and adds couple of tests for this scenario.

This is rather serious issue so it would be good if we could get it
fixed in 10.1.

  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From c641eb6170f3dec26cf52264e1f931393aee434e Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Thu, 26 Oct 2017 11:11:42 +0200
Subject: [PATCH] Don't reset additional columns on subscriber to NULL after

When publisher tables has fewer columns than subscriber, the update of
row on publisher should result in update of only common columns.
Previous coding mistakenly reset the values of additonal columns on
subscriber to NULL because it failed to skip updates of columns not
found in attribute map.
 src/backend/replication/logical/worker.c   |  7 ++-
 src/test/subscription/t/008_diff_schema.pl | 77 ++++++++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 2 deletions(-)
 create mode 100644 src/test/subscription/t/008_diff_schema.pl

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bc6d8246a7..0e68670767 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -391,10 +391,13 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
 		int			remoteattnum = rel->attrmap[i];
-		if (remoteattnum >= 0 && !replaces[remoteattnum])
+		if (remoteattnum < 0)
-		if (remoteattnum >= 0 && values[remoteattnum] != NULL)
+		if (!replaces[remoteattnum])
+			continue;
+		if (values[remoteattnum] != NULL)
 			Oid			typinput;
 			Oid			typioparam;
diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl
new file mode 100644
index 0000000000..a6b4597403
--- /dev/null
+++ b/src/test/subscription/t/008_diff_schema.pl
@@ -0,0 +1,77 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+# Create some preexisting content on publisher
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz default now(), d bigint default 999)");
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+my $appname = 'tap_sub';
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+# update the tuples on publisher and check the additional columns on
+# subscriber didn't change
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check extra columns contain local defaults');
+# change the local values of extra on subscriber, update publisher and check
+# subscriber retains the expected values
+$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check extra columns contain locally changed data');
+# done

Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to