This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 106a3d7 KAFKA-7833: Add Global/StateStore name conflict check (#8825) 106a3d7 is described below commit 106a3d7d87b9d8296ef86d9340ea3a9d9d9b54fd Author: Rob Meng <rob.xu.m...@gmail.com> AuthorDate: Wed Jun 10 17:06:22 2020 -0700 KAFKA-7833: Add Global/StateStore name conflict check (#8825) Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../internals/InternalTopologyBuilder.java | 3 ++ .../internals/InternalTopologyBuilderTest.java | 36 ++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 7dc2df1..8d69ef1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -517,6 +517,9 @@ public class InternalTopologyBuilder { if (!allowOverride && stateFactory != null && stateFactory.builder != storeBuilder) { throw new TopologyException("A different StateStore has already been added with the name " + storeBuilder.name()); } + if (globalStateBuilders.containsKey(storeBuilder.name())) { + throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeBuilder.name()); + } stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 67f1537..8402dfd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -381,6 +381,42 @@ public class InternalTopologyBuilderTest { } @Test + public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() { + final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false).withLoggingDisabled(); + builder.addGlobalStore( + storeBuilder, + "global-store", + null, + null, + null, + "global-topic", + "global-processor", + new MockProcessorSupplier<>()); + try { + builder.addStateStore(storeBuilder); + fail("Should throw TopologyException with store name conflict"); + } catch (final TopologyException expected) { /* ok */ } + } + + @Test + public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() { + final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false).withLoggingDisabled(); + builder.addStateStore(storeBuilder); + try { + builder.addGlobalStore( + storeBuilder, + "global-store", + null, + null, + null, + "global-topic", + "global-processor", + new MockProcessorSupplier<>()); + fail("Should throw TopologyException with store name conflict"); + } catch (final TopologyException expected) { /* ok */ } + } + + @Test public void testAddStateStore() { builder.addStateStore(storeBuilder); builder.setApplicationId("X");