This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 106a3d7  KAFKA-7833: Add Global/StateStore name conflict check (#8825)
106a3d7 is described below

commit 106a3d7d87b9d8296ef86d9340ea3a9d9d9b54fd
Author: Rob Meng <rob.xu.m...@gmail.com>
AuthorDate: Wed Jun 10 17:06:22 2020 -0700

    KAFKA-7833: Add Global/StateStore name conflict check (#8825)
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../internals/InternalTopologyBuilder.java         |  3 ++
 .../internals/InternalTopologyBuilderTest.java     | 36 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 7dc2df1..8d69ef1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -517,6 +517,9 @@ public class InternalTopologyBuilder {
         if (!allowOverride && stateFactory != null && stateFactory.builder != 
storeBuilder) {
             throw new TopologyException("A different StateStore has already 
been added with the name " + storeBuilder.name());
         }
+        if (globalStateBuilders.containsKey(storeBuilder.name())) {
+            throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeBuilder.name());
+        }
 
         stateFactories.put(storeBuilder.name(), new 
StateStoreFactory<>(storeBuilder));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 67f1537..8402dfd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -381,6 +381,42 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test
+    public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() {
+        final StoreBuilder storeBuilder = new 
MockKeyValueStoreBuilder("store", false).withLoggingDisabled();
+        builder.addGlobalStore(
+                storeBuilder,
+                "global-store",
+                null,
+                null,
+                null,
+                "global-topic",
+                "global-processor",
+                new MockProcessorSupplier<>());
+        try {
+            builder.addStateStore(storeBuilder);
+            fail("Should throw TopologyException with store name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
+    }
+
+    @Test
+    public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() 
{
+        final StoreBuilder storeBuilder = new 
MockKeyValueStoreBuilder("store", false).withLoggingDisabled();
+        builder.addStateStore(storeBuilder);
+        try {
+            builder.addGlobalStore(
+                    storeBuilder,
+                    "global-store",
+                    null,
+                    null,
+                    null,
+                    "global-topic",
+                    "global-processor",
+                    new MockProcessorSupplier<>());
+            fail("Should throw TopologyException with store name conflict");
+        } catch (final TopologyException expected) { /* ok */ }
+    }
+
+    @Test
     public void testAddStateStore() {
         builder.addStateStore(storeBuilder);
         builder.setApplicationId("X");

Reply via email to