This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ba02e8c KAFKA-9244: Update FK reference should unsubscribe old FK (#7758) ba02e8c is described below commit ba02e8c6b6802262646a7d6287c7a2c237be65fd Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri Nov 29 21:21:06 2019 -0800 KAFKA-9244: Update FK reference should unsubscribe old FK (#7758) Reviewers: Adam Bellemare <adam.bellem...@wishabi.com>, John Roesler <j...@confluent.io> --- .../SubscriptionStoreReceiveProcessorSupplier.java | 4 +- .../KTableKTableForeignKeyJoinIntegrationTest.java | 71 ++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java index 0a86980..9cbeadd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java @@ -92,9 +92,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO> final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(value, context().timestamp()); final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey); - //If the subscriptionWrapper hash indicates a null, must delete from statestore. //This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier - if (value.getHash() == null) { + if (value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) || + value.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) { store.delete(subscriptionKey); } else { store.put(subscriptionKey, newValue); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 80c0f52..746d6b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -426,6 +426,77 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } } + @Test + public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() { + final Topology topology = getTopology(streamsConfig, "store", leftJoin); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { + final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); + final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); + final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); + final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + + // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference + // then populate update on RHS + right.pipeInput("rhs1", "rhsValue1"); + right.pipeInput("rhs2", "rhsValue2"); + + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + assertThat( + asMap(store), + is(emptyMap()) + ); + + left.pipeInput("lhs1", "lhsValue1|rhs1"); + { + final Map<String, String> expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") + ); + assertThat( + outputTopic.readKeyValuesToMap(), + is(expected) + ); + assertThat( + asMap(store), + is(expected) + ); + } + + // Change LHS foreign key reference + left.pipeInput("lhs1", "lhsValue1|rhs2"); + { + final Map<String, String> expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") + ); + assertThat( + outputTopic.readKeyValuesToMap(), + is(expected) + ); + assertThat( + asMap(store), + is(expected) + ); + } + + // Populate RHS update on old LHS foreign key ref + right.pipeInput("rhs1", "rhsValue1Delta"); + { + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + assertThat( + asMap(store), + is(mkMap( + mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") + )) + ); + } + } + } + private static Map<String, String> asMap(final KeyValueStore<String, String> store) { final HashMap<String, String> result = new HashMap<>(); store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));