pgwhalen commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r430608121
########## File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ########## @@ -93,8 +93,6 @@ public void process(final String dummy, final String line) { this.kvStore.put(word, oldValue + 1); } } - - context.commit(); Review comment: https://github.com/apache/kafka/pull/6824#discussion_r424769188 If I were to guess, @mjsax wanted it removed because it isn't necessary and distracts from the most simple form of the example. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java ########## @@ -55,61 +60,93 @@ private final String topic = "stream"; private final String stateStoreName = "myTransformState"; private final List<KeyValue<Integer, Integer>> results = new ArrayList<>(); - private final ForeachAction<Integer, Integer> action = (key, value) -> results.add(KeyValue.pair(key, value)); + private final ForeachAction<Integer, Integer> accumulateExpected = (key, value) -> results.add(KeyValue.pair(key, value)); Review comment: I would assume it's a JUnit test that uses the [standard lifecycle](https://stackoverflow.com/a/19381563/1031507) where the class is recreated for every test. I haven't looked into the test running infrastructure at all though so maybe because this is `@Category({IntegrationTest.class})` you would expect some other behavior? ########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -409,7 +405,31 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code> to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li> </ul> - <p>In this topology, the <code class="docutils literal"><span class="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node, and an + <p>In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology. + This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> + instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this: + </p> + <pre class="brush: java"> + Topology builder = new Topology(); + // add the source processor node that takes Kafka "source-topic" as input + builder.addSource("Source", "source-topic") + // add the WordCountProcessor node which takes the source processor as its upstream processor. + // the ProcessorSupplier provides the count store associated with the WordCountProcessor + .addProcessor("Process", new ProcessorSupplier<String, String>() { + public Processor<String, String> get() { + return new WordCountProcessor(); + } + public Set<StoreBuilder<?>> stores() { + return countStoreBuilder; + } + }) Review comment: Good catch! Will add ########## File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.examples.wordcount; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.ConnectedStoreProvider; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; + +import java.time.Duration; +import java.util.Collections; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level Kafka Streams DSL, + * how to implement the WordCount program that computes a simple word occurrence histogram from an input text. + * <p> + * <strong>Note: This is simplified code that only works correctly for single partition input topics. + * Check out {@link WordCountDemo} for a generic example.</strong> + * <p> + * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record + * is an updated count of a single word. + * <p> + * This example differs from {@link WordCountProcessorDemo} in that it uses a {@link Transformer} to define the word + * count logic, and the topology is wired up through a {@link StreamsBuilder}, which more closely resembles the high-level DSL. + * Additionally, the {@link TransformerSupplier} specifies the {@link StoreBuilder} that the {@link Transformer} needs + * by implementing {@link ConnectedStoreProvider#stores()}. + * <p> + * Before running this example you must create the input topic and the output topic (e.g. via + * {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via + * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic. + */ +public final class WordCountTransformerDemo { Review comment: Great point, I'll add. I did not realize there was one for the class I was modeling (`WordCountProcessorDemo`) ########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -381,22 +381,18 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate output data streams out of the topology.</p> <p>Here is an example implementation:</p> - <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span> - -<span class="c1">// add the source processor node that takes Kafka topic "source-topic" as input</span> -<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">"Source"</span><span class="o">,</span> <span class="s">"source-topic"</span><span class="o">)</span> - - <span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor</span> - <span class="o">.</span><span class="na">addProcessor</span><span class="o">(</span><span class="s">"Process"</span><span class="o">,</span> <span class="o">()</span> <span class="o">-></span> <span class="k">new</span> <span class="n">WordCountProcessor</span><span class="o">(),</span> <span class="s">"Source"</span><span class="o">)</span> - - <span class="c1">// add the count store associated with the WordCountProcessor processor</span> - <span class="o">.</span><span class="na">addStateStore</span><span class="o">(</span><span class="n">countStoreBuilder</span><span class="o">,</span> <span class="s">"Process"</span><span class="o">)</span> - - <span class="c1">// add the sink processor node that takes Kafka topic "sink-topic" as output</span> - <span class="c1">// and the WordCountProcessor node as its upstream processor</span> - <span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">"Sink"</span><span class="o">,</span> <span class="s">"sink-topic"</span><span class="o">,</span> <span class="s">"Process"</span><span class="o">);</span> -</pre></div> - </div> + <pre class="brush: java"> Review comment: After the `"Source"` addition suggested in another comment:  ########## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ########## @@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) { internalTopologyBuilder.addProcessor(name, supplier, parentNames); + final Set<StoreBuilder<?>> stores = supplier.stores(); + if (stores != null) { + for (final StoreBuilder storeBuilder : stores) { + internalTopologyBuilder.addStateStore(storeBuilder, name); + } + final String[] storeNames = stores.stream().map(StoreBuilder::name).toArray(String[]::new); + internalTopologyBuilder.connectProcessorAndStateStores(name, storeNames); Review comment: Good point, removing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org