pgwhalen commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r426309015
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ########## @@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() { assertNull(store.get("key4")); } + @Test + public void testDrivingConnectedStateStoreTopology() { + driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2")); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3")); + driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value4")); + assertNoOutputRecord(OUTPUT_TOPIC_1); + + final KeyValueStore<String, String> store = driver.getKeyValueStore("connectedStore"); + assertEquals("value4", store.get("key1")); + assertEquals("value2", store.get("key2")); + assertEquals("value3", store.get("key3")); + assertNull(store.get("key4")); + } + + @Test + public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() { + final String storeName = "connectedStore"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()); + topology + .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) + .addProcessor("processor1", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") + .addProcessor("processor2", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2") Review comment: I'm a little confused - the code `new`s different `StatefulProcessor`s for each step, so I don't see how it would be reused. It would certainly be a risk if the result of `defineWithStores` is reused, but in practice it is not; it's just a sort of macro for defining tests. I was just trying to emulate the existing `define`, which has the same issue. Perhaps I should make your suggested change as well as remove `define()` entirely? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org