pgwhalen commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r430608121



##########
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
##########
@@ -93,8 +93,6 @@ public void process(final String dummy, final String line) {
                             this.kvStore.put(word, oldValue + 1);
                         }
                     }
-
-                    context.commit();

Review comment:
       https://github.com/apache/kafka/pull/6824#discussion_r424769188
   
   If I were to guess, @mjsax wanted it removed because it isn't necessary and 
distracts from the most simple form of the example.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
##########
@@ -55,61 +60,93 @@
     private final String topic = "stream";
     private final String stateStoreName = "myTransformState";
     private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
-    private final ForeachAction<Integer, Integer> action = (key, value) -> 
results.add(KeyValue.pair(key, value));
+    private final ForeachAction<Integer, Integer> accumulateExpected = (key, 
value) -> results.add(KeyValue.pair(key, value));

Review comment:
       I would assume it's a JUnit test that uses the [standard 
lifecycle](https://stackoverflow.com/a/19381563/1031507) where the class is 
recreated for every test.  I haven't looked into the test running 
infrastructure at all though so maybe because this is 
`@Category({IntegrationTest.class})` you would expect some other behavior?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -409,7 +405,31 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
                     as its upstream processor and writing to a separate <code 
class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> 
Kafka topic (note that users can also use another overloaded variant of <code 
class="docutils literal"><span class="pre">addSink</span></code>
                     to dynamically determine the Kafka topic to write to for 
each received record from the upstream processor).</li>
             </ul>
-            <p>In this topology, the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> stream processor node is 
considered a downstream processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node, and an
+            <p>In some cases, it may be more convenient to add and connect a 
state store at the same time as you add the processor to the topology.
+                This can be done by implementing <code class="docutils 
literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the 
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                instead of calling <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>, like this:
+            </p>
+            <pre class="brush: java">
+                Topology builder = new Topology();
+                // add the source processor node that takes Kafka 
"source-topic" as input
+                builder.addSource("Source", "source-topic")
+                    // add the WordCountProcessor node which takes the source 
processor as its upstream processor.
+                    // the ProcessorSupplier provides the count store 
associated with the WordCountProcessor
+                    .addProcessor("Process", new ProcessorSupplier&ltString, 
String&gt() {
+                        public Processor&ltString, String&gt get() {
+                            return new WordCountProcessor();
+                        }
+                        public Set&ltStoreBuilder&lt?&gt&gt stores() {
+                            return countStoreBuilder;
+                        }
+                    })

Review comment:
       Good catch! Will add

##########
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level 
Processor APIs with the high-level Kafka Streams DSL,
+ * how to implement the WordCount program that computes a simple word 
occurrence histogram from an input text.
+ * <p>
+ * <strong>Note: This is simplified code that only works correctly for single 
partition input topics.
+ * Check out {@link WordCountDemo} for a generic example.</strong>
+ * <p>
+ * In this example, the input stream reads from a topic named 
"streams-plaintext-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic 
"streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ * <p>
+ * This example differs from {@link WordCountProcessorDemo} in that it uses a 
{@link Transformer} to define the word
+ * count logic, and the topology is wired up through a {@link StreamsBuilder}, 
which more closely resembles the high-level DSL.
+ * Additionally, the {@link TransformerSupplier} specifies the {@link 
StoreBuilder} that the {@link Transformer} needs
+ * by implementing {@link ConnectedStoreProvider#stores()}.
+ * <p>
+ * Before running this example you must create the input topic and the output 
topic (e.g. via
+ * {@code bin/kafka-topics.sh --create ...}), and write some data to the input 
topic (e.g. via
+ * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data 
arriving in the output topic.
+ */
+public final class WordCountTransformerDemo {

Review comment:
       Great point, I'll add. I did not realize there was one for the class I 
was modeling (`WordCountProcessorDemo`)

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -381,22 +381,18 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
                 to generate input data streams into the topology, and sink 
processors with the specified Kafka topics to generate
                 output data streams out of the topology.</p>
             <p>Here is an example implementation:</p>
-            <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Topology</span> <span 
class="n">builder</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Topology</span><span class="o">();</span>
-
-<span class="c1">// add the source processor node that takes Kafka topic 
&quot;source-topic&quot; as input</span>
-<span class="n">builder</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="s">&quot;Source&quot;</span><span class="o">,</span> <span 
class="s">&quot;source-topic&quot;</span><span class="o">)</span>
-
-    <span class="c1">// add the WordCountProcessor node which takes the source 
processor as its upstream processor</span>
-    <span class="o">.</span><span class="na">addProcessor</span><span 
class="o">(</span><span class="s">&quot;Process&quot;</span><span 
class="o">,</span> <span class="o">()</span> <span class="o">-&gt;</span> <span 
class="k">new</span> <span class="n">WordCountProcessor</span><span 
class="o">(),</span> <span class="s">&quot;Source&quot;</span><span 
class="o">)</span>
-
-    <span class="c1">// add the count store associated with the 
WordCountProcessor processor</span>
-    <span class="o">.</span><span class="na">addStateStore</span><span 
class="o">(</span><span class="n">countStoreBuilder</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">)</span>
-
-    <span class="c1">// add the sink processor node that takes Kafka topic 
&quot;sink-topic&quot; as output</span>
-    <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
-    <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
-</pre></div>
-            </div>
+            <pre class="brush: java">

Review comment:
       After the `"Source"` addition suggested in another comment:
   
   
![screencapture-localhost-25-documentation-streams-developer-guide-processor-api-html-2020-05-26-13_27_24](https://user-images.githubusercontent.com/3172488/82937232-6e501500-9f55-11ea-96ed-26a2a4c0457c.png)

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String 
name,
                                               final ProcessorSupplier supplier,
                                               final String... parentNames) {
         internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+        final Set<StoreBuilder<?>> stores = supplier.stores();
+        if (stores != null) {
+            for (final StoreBuilder storeBuilder : stores) {
+                internalTopologyBuilder.addStateStore(storeBuilder, name);
+            }
+            final String[] storeNames = 
stores.stream().map(StoreBuilder::name).toArray(String[]::new);
+            internalTopologyBuilder.connectProcessorAndStateStores(name, 
storeNames);

Review comment:
       Good point, removing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to