mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427605702



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testDrivingConnectedStateStoreTopology() {
+        driver = new 
TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value4"));
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("connectedStore");
+        assertEquals("value4", store.get("key1"));
+        assertEquals("value2", store.get("key2"));
+        assertEquals("value3", store.get("key3"));
+        assertNull(store.get("key4"));
+    }
+
+    @Test
+    public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
+        final String storeName = "connectedStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_2)
+            .addProcessor("processor1", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor2", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2")

Review comment:
       Well, `get()` of the returned supplier could be called multiple times, 
thus, would return the name object that is passed into `defineWithStores` each 
time -- this is incorrect though, because each time `get()` is called a new 
object must be returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to