mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r424758763



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
     <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                        processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                    <li>A predefined persistent key-value state store is added 
and connected to the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate 
<code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users 
can also use another overloaded variant of <code class="docutils literal"><span 
class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+                    This can be done by implementing <code class="docutils 
literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the 
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Topology</span> <span 
class="n">builder</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic 
&quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="s">&quot;Source&quot;</span><span class="o">,</span> <span 
class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source 
processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { 
public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); 
} public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } 
});</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic 
&quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
+</pre></div>
+                    <p>This allows for a processor to "own" state stores, 
effectively encapsulating its usage from the user wiring the topology.
+                        Multiple processors that share a state store may 
provide the same store with this technique, as long as the <code 
class="docutils literal"><span class="pre">StoreBuilder</span></code> is the 
same instance.</p>
+                    <p>In these topologies, the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> stream processor 
node is considered a downstream processor of the <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
+                        upstream processor of the <code class="docutils 
literal"><span class="pre">&quot;Sink&quot;</span></code> node.  As a result, 
whenever the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched 
record from
+                        Kafka to its downstream <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node, the <code 
class="docutils literal"><span 
class="pre">WordCountProcessor#process()</span></code> method is triggered to 
process the record and
+                        update the associated state store. Whenever <code 
class="docutils literal"><span class="pre">context#forward()</span></code> is 
called in the
+                        <code class="docutils literal"><span 
class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate 
key-value pair will be sent via the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> processor node to
+                        the Kafka topic <code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code>.  Note that in the <code 
class="docutils literal"><span class="pre">WordCountProcessor</span></code> 
implementation, you must refer to the
+                        same store name <code class="docutils literal"><span 
class="pre">&quot;Counts&quot;</span></code> when accessing the key-value 
store, otherwise an exception will be thrown at runtime,
+                        indicating that the state store cannot be found. If 
the state store is not associated with the processor
+                        in the <code class="docutils literal"><span 
class="pre">Topology</span></code> code, accessing it in the processor&#8217;s 
<code class="docutils literal"><span class="pre">init()</span></code> method 
will also throw an exception at
+                        runtime, indicating the state store is not accessible 
from this processor.</p>
+                    <p>Now that you have fully defined your processor topology 
in your application, you can proceed to
+                        <a class="reference internal" 
href="running-app.html#streams-developer-guide-execution"><span class="std 
std-ref">running the Kafka Streams application</span></a>.</p>
+                </div>

Review comment:
       Meta comment: can you also extend `streams/upgrade-guide.html` -- there 
is a section "public API" changes for the 2.6 release.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3164,10 +3164,17 @@ <h4><a id="streams_concepts_globalktable" 
href="#streams_concepts_globalktable">
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state 
stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span 
class="pre">KStream#process()</span></code> method call (note that this is a 
different method than <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a 
processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
+                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>
+                            beforehand, the store will be automatically added 
for you.

Review comment:
       nit: `,` -> `;` ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
     <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                        processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                    <li>A predefined persistent key-value state store is added 
and connected to the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate 
<code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users 
can also use another overloaded variant of <code class="docutils literal"><span 
class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+                    This can be done by implementing <code class="docutils 
literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the 
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>, like this:

Review comment:
       nit: `in place` -> `instead` ?

##########
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level 
Processor APIs with the high-level KStream DSL,

Review comment:
       `KStream` -> `Kafka Streams`
   
   (`KStream` is not an appropriate abbreviation for Kafka Streams, because the 
DSL uses the abstraction of `KStream` and `KTable` and thus it would be 
ambiguous.)

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
     <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                        processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                    <li>A predefined persistent key-value state store is added 
and connected to the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate 
<code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users 
can also use another overloaded variant of <code class="docutils literal"><span 
class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+                    This can be done by implementing <code class="docutils 
literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the 
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Topology</span> <span 
class="n">builder</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic 
&quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="s">&quot;Source&quot;</span><span class="o">,</span> <span 
class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source 
processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { 
public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); 
} public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } 
});</span>

Review comment:
       Can we put this code (ie, the definition of `ProcessorSupplier`) into 
multiple lines?
   ```
   builder.addProcessor("Process", new ProcessorSupplier() {
       public Processor&lt;String, String&gt; get() {
           return new WordCountProcessor();
       }
       public Set&lt;StoreBuilder&gt; stores() { 
           return countStoreBuilder;
       }
   });
   ```
   
   Also note that in the original example above, there is an indention because 
method calls are chained:
   ```
   builder.source()
       .process()
       .addStateStore()
       .addSink();
   ```
   
   Either chain _all_ method calls, too, or chain none. If you don't chain, 
remove the indention. Currently it would render as follows (including an 
incorrect `;` after `addProcessor()`:
   ```
   builder.addSource();
       builder.addProcessor();
       .addSink();
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -1915,43 +2061,66 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * can be observed and additional periodic actions can be performed.
      * Note that this is a terminal operation that returns void.
      * <p>
-     * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is 
available by default):
-     * <pre>{@code
+     * In order for the processor to use state stores, the stores must be 
added to the topology and connected to the
+     * processor using at least one of two strategies (though it's not 
required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the processor.
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * inputStream.process(new ProcessorSupplier() { ... }, 
"myProcessorState");
+     * inputStream.process(new ProcessorSupplier() { public Processor get() { 
return new MyProcessor(); } }, "myProcessorState");
      * }</pre>
-     * Within the {@link Processor}, the state is obtained via the
+     * The second strategy is for the given {@link ProcessorSupplier} to 
implement {@link ConnectedStoreProvider#stores()},
+     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the processor.
+     * <pre>{@code
+     * class MyProcessorSupplier implements ProcessorSupplier {
+     *     // supply processor
+     *     Processor get() {
+     *         return new MyProcessor();
+     *     }
+     *
+     *     // provide store(s) that will be added and connected to the 
associated processor
+     *     Set<StoreBuilder> stores() {
+     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
+     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
+     *                   Serdes.String(),
+     *                   Serdes.String());
+     *         return Collections.singleton(keyValueStoreBuilder);
+     *     }
+     * }
+     *
+     * ...
+     *
+     * inputStream.process(new MyProcessorSupplier());
+     * }</pre>
+     * <p>
+     * With either strategy, within the {@link Processor}, the state is 
obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
      * <pre>{@code
-     * new ProcessorSupplier() {
-     *     Processor get() {
-     *         return new Processor() {
-     *             private StateStore state;
+     * class MyProcessor implements Processor {
+     *     private StateStore state;
      *
-     *             void init(ProcessorContext context) {
-     *                 this.state = context.getStateStore("myProcessorState");
-     *                 // punctuate each second, can access this.state
-     *                 context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *             }
+     *     void init(ProcessorContext context) {
+     *         this.state = context.getStateStore("myProcessorState");
+     *         // punctuate each second, can access this.state
+     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *     }
      *
-     *             void process(K key, V value) {
-     *                 // can access this.state
-     *             }
+     *     void process(K key, V value) {
+     *         // can access this.state
+     *     }
      *
-     *             void close() {
-     *                 // can access this.state
-     *             }
-     *         }
+     *     void close() {
+     *         // can access this.state

Review comment:
       `process()` has an overload below that needs and update, too. There are 
also some `transform()` variants you missed.
   
   Overall, it should be 14 methods that needs to be updated. I counted only 8 
that this PR updates.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -1162,53 +1186,77 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}
      * the processing progress can be observed and additional periodic actions 
can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is 
available by default):
+     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
+     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.flatTransform(new 
TransformerSupplier() { ... }, "myTransformState");
+     * KStream outputStream = inputStream.flatTransform(new 
TransformerSupplier() { public Transformer get() { return new MyTransformer(); 
} }, "myTransformState");

Review comment:
       as above (more below; not add comments again)

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
     <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                        processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                    <li>A predefined persistent key-value state store is added 
and connected to the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate 
<code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users 
can also use another overloaded variant of <code class="docutils literal"><span 
class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+                    This can be done by implementing <code class="docutils 
literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the 
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Topology</span> <span 
class="n">builder</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic 
&quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="s">&quot;Source&quot;</span><span class="o">,</span> <span 
class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source 
processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { 
public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); 
} public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } 
});</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic 
&quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
+</pre></div>
+                    <p>This allows for a processor to "own" state stores, 
effectively encapsulating its usage from the user wiring the topology.
+                        Multiple processors that share a state store may 
provide the same store with this technique, as long as the <code 
class="docutils literal"><span class="pre">StoreBuilder</span></code> is the 
same instance.</p>

Review comment:
       nit: highlight `instance`

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
     <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                        processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                    <li>A predefined persistent key-value state store is added 
and connected to the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate 
<code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users 
can also use another overloaded variant of <code class="docutils literal"><span 
class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.

Review comment:
       Not sure what this means: `it is connected to to the topology.`

##########
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level 
Processor APIs with the high-level KStream DSL,
+ * how to implement the WordCount program that computes a simple word 
occurrence histogram from an input text.
+ * <p>
+ * <strong>Note: This is simplified code that only works correctly for single 
partition input topics.
+ * Check out {@link WordCountDemo} for a generic example.</strong>
+ * <p>
+ * In this example, the input stream reads from a topic named 
"streams-plaintext-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic 
"streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ * <p>
+ * This example differs from {@link WordCountProcessorDemo} in that it uses a 
{@link Transformer} and {@link ConnectedStoreProvider}
+ * to define the application topology through a {@link StreamsBuilder}, which 
more closely resembles the high-level DSL.
+ * <p>
+ * Before running this example you must create the input topic and the output 
topic (e.g. via
+ * {@code bin/kafka-topics.sh --create ...}), and write some data to the input 
topic (e.g. via
+ * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data 
arriving in the output topic.
+ */
+public final class WordCountTransformerDemo {
+
+    static class MyTransformerSupplier implements TransformerSupplier<String, 
String, KeyValue<String, String>> {
+
+        @Override
+        public Transformer<String, String, KeyValue<String, String>> get() {
+            return new Transformer<String, String, KeyValue<String, String>>() 
{
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(final ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(Duration.ofSeconds(1), 
PunctuationType.STREAM_TIME, timestamp -> {
+                        try (final KeyValueIterator<String, Integer> iter = 
kvStore.all()) {
+                            System.out.println("----------- " + timestamp + " 
----------- ");
+
+                            while (iter.hasNext()) {
+                                final KeyValue<String, Integer> entry = 
iter.next();
+
+                                System.out.println("[" + entry.key + ", " + 
entry.value + "]");
+
+                                context.forward(entry.key, 
entry.value.toString());
+                            }
+                        }
+                    });
+                    this.kvStore = (KeyValueStore<String, Integer>) 
context.getStateStore("Counts");
+                }
+
+                @Override
+                public KeyValue<String, String> transform(final String dummy, 
final String line) {
+                    final String[] words = 
line.toLowerCase(Locale.getDefault()).split(" ");
+
+                    for (final String word : words) {
+                        final Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+
+                    context.commit();

Review comment:
       I think we should remove `context.commit();` (Can we also remove it in 
`WordCountProcessorDemo`?)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
##########
@@ -329,10 +329,11 @@ public void testAddStateStoreWithSink() {
     }
 
     @Test
-    public void testAddStateStoreWithDuplicates() {
+    public void testAddStateStoreWithDifferentInstances() {

Review comment:
       Can we improve the test name -> `shouldNotAllowToAddStoresWithSameName` 
(or similar)
   
   We should also add a test that verifies that adding the same `StoreBuilder` 
instance multiple times works.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -953,7 +977,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * flatTransform()}.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
-     * @param stateStoreNames     the names of the state stores used by the 
processor
+     * @param stateStoreNames     the names of the state stores used by the 
transformer, passed only if {@link ConnectedStoreProvider#stores()} is null

Review comment:
       Why this:
   `, passed only if {@link ConnectedStoreProvider#stores()} is null` 
   
   From the KIP: 
   > A user may continue to "connect" stores to a processor by passing 
stateStoreNames when calling stream.process/transform(...) .  This may be used 
in combination with a Supplier  that provides its own state stores by 
implementing ConnectedStoreProvider::stores() .

##########
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Demonstrates, using a {@link Transformer} which combines the low-level 
Processor APIs with the high-level KStream DSL,
+ * how to implement the WordCount program that computes a simple word 
occurrence histogram from an input text.
+ * <p>
+ * <strong>Note: This is simplified code that only works correctly for single 
partition input topics.
+ * Check out {@link WordCountDemo} for a generic example.</strong>
+ * <p>
+ * In this example, the input stream reads from a topic named 
"streams-plaintext-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic 
"streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ * <p>
+ * This example differs from {@link WordCountProcessorDemo} in that it uses a 
{@link Transformer} and {@link ConnectedStoreProvider}

Review comment:
       `uses a {@link Transformer} and {@link ConnectedStoreProvider}` -- this 
might be confusing because `TransformerSupplier extends ConnectedStoreProvider` 
and the code does not contain any direct reference to 
`ConnectedStoreProvider`... Can we rephrase it?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -885,49 +887,71 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()},
      * the processing progress can be observed and additional periodic actions 
can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is 
available by default):
-     * <pre>{@code

Review comment:
       This line must be kept to start the code example

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -396,33 +396,52 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
     <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
     <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
 </pre></div>
+                </div>
+                <p>Here is a quick explanation of this example:</p>
+                <ul class="simple">
+                    <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                        <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                    <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                        processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                    <li>A predefined persistent key-value state store is added 
and connected to the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                        <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                    <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                        as its upstream processor and writing to a separate 
<code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users 
can also use another overloaded variant of <code class="docutils literal"><span 
class="pre">addSink</span></code>
+                        to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).</li>
+                </ul>
+                <p>
+                    In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+                    This can be done by implementing <code class="docutils 
literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the 
<code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                    in place of calling <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>, like this:
+                </p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Topology</span> <span 
class="n">builder</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic 
&quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="s">&quot;Source&quot;</span><span class="o">,</span> <span 
class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source 
processor as its upstream processor, along with its state store</span>
+    <span class="o">builder.addProcessor("Process", new ProcessorSupplier() { 
public Processor&lt;String, String&gt; get() { return new WordCountProcessor(); 
} public Set&lt;StoreBuilder&gt; stores() { return countStoreBuilder; } 
});</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic 
&quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
+</pre></div>
+                    <p>This allows for a processor to "own" state stores, 
effectively encapsulating its usage from the user wiring the topology.
+                        Multiple processors that share a state store may 
provide the same store with this technique, as long as the <code 
class="docutils literal"><span class="pre">StoreBuilder</span></code> is the 
same instance.</p>
+                    <p>In these topologies, the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> stream processor 
node is considered a downstream processor of the <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
+                        upstream processor of the <code class="docutils 
literal"><span class="pre">&quot;Sink&quot;</span></code> node.  As a result, 
whenever the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched 
record from
+                        Kafka to its downstream <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node, the <code 
class="docutils literal"><span 
class="pre">WordCountProcessor#process()</span></code> method is triggered to 
process the record and
+                        update the associated state store. Whenever <code 
class="docutils literal"><span class="pre">context#forward()</span></code> is 
called in the
+                        <code class="docutils literal"><span 
class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate 
key-value pair will be sent via the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> processor node to
+                        the Kafka topic <code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code>.  Note that in the <code 
class="docutils literal"><span class="pre">WordCountProcessor</span></code> 
implementation, you must refer to the
+                        same store name <code class="docutils literal"><span 
class="pre">&quot;Counts&quot;</span></code> when accessing the key-value 
store, otherwise an exception will be thrown at runtime,
+                        indicating that the state store cannot be found. If 
the state store is not associated with the processor
+                        in the <code class="docutils literal"><span 
class="pre">Topology</span></code> code, accessing it in the processor&#8217;s 
<code class="docutils literal"><span class="pre">init()</span></code> method 
will also throw an exception at
+                        runtime, indicating the state store is not accessible 
from this processor.</p>
+                    <p>Now that you have fully defined your processor topology 
in your application, you can proceed to
+                        <a class="reference internal" 
href="running-app.html#streams-developer-guide-execution"><span class="std 
std-ref">running the Kafka Streams application</span></a>.</p>
+                </div>
             </div>
-            <p>Here is a quick explanation of this example:</p>

Review comment:
       I seems you did not really change anything here? (Only the indention? 
Why? -- Make it hard to review.)

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -291,13 +291,19 @@ private void mockStoreBuilder() {
     }
 
     @Test
-    public void shouldNotAllowToAddStoreWithSameName() {
+    public void shouldNotAllowToAddStoreWithSameNameAndDifferentInstance() {

Review comment:
       We should add a new test `shouldAllowToShareStoreUsingSameStoreBuilder`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -885,49 +887,71 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()},
      * the processing progress can be observed and additional periodic actions 
can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is 
available by default):
-     * <pre>{@code
+     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
+     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.transform(new TransformerSupplier() 
{ ... }, "myTransformState");
+     * KStream outputStream = inputStream.transform(new TransformerSupplier() 
{ public Transformer get() { return new MyTransformer(); } }, 
"myTransformState");

Review comment:
       Should be multiple lines to improve readabilty.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -885,49 +887,71 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()},
      * the processing progress can be observed and additional periodic actions 
can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is 
available by default):
-     * <pre>{@code
+     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
+     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
+     * access to global state stores is available by default).
+     * <p>
+     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
+     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
      *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
-     * // register store
+     * // add store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.transform(new TransformerSupplier() 
{ ... }, "myTransformState");
+     * KStream outputStream = inputStream.transform(new TransformerSupplier() 
{ public Transformer get() { return new MyTransformer(); } }, 
"myTransformState");
      * }</pre>
-     * Within the {@link Transformer}, the state is obtained via the {@link 
ProcessorContext}.
+     * The second strategy is for the given {@link TransformerSupplier} to 
implement {@link ConnectedStoreProvider#stores()},
+     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
+     * <pre>{@code
+     * class MyTransformerSupplier implements TransformerSupplier {
+     *     // supply transformer
+     *     Transformer get() {
+     *         return new MyTransformer();
+     *     }
+     *
+     *     // provide store(s) that will be added and connected to the 
associated transformer

Review comment:
       Should we add a comment to highlight that the store name 
`"myTransformState"` from the builder is used to access the store via the 
`context` later?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -1231,7 +1279,7 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
      * @param named               a {@link Named} config used to name the 
processor in the topology
-     * @param stateStoreNames     the names of the state stores used by the 
processor
+     * @param stateStoreNames     the names of the state stores used by the 
transformer, passed only if {@link ConnectedStoreProvider#stores()} is null

Review comment:
       As above

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -723,6 +775,20 @@ public void process(final String key, final String value) {
         return () -> processor;
     }
 
+    private <K, V> ProcessorSupplier<K, V> defineWithStores(final Processor<K, 
V> processor, final Set<StoreBuilder> stores) {
+        return new ProcessorSupplier<K, V>() {
+            @Override
+            public Processor<K, V> get() {
+                return processor;

Review comment:
       This does not seem to be a good implementation? A supplier is supposed 
to return a new instance on each `get()` call. It might not be an issue now, 
but might be error prone in the future. 
   
   Maybe accept an supplier as input instead of a `Processor`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testDrivingConnectedStateStoreTopology() {
+        driver = new 
TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value4"));
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+
+        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("connectedStore");
+        assertEquals("value4", store.get("key1"));
+        assertEquals("value2", store.get("key2"));
+        assertEquals("value3", store.get("key3"));
+        assertNull(store.get("key4"));
+    }
+
+    @Test
+    public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
+        final String storeName = "connectedStore";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
+        topology
+            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+            .addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_2)
+            .addProcessor("processor1", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+            .addProcessor("processor2", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2")

Review comment:
       The same `StatefulProcessor` instance would be used in each processor 
step -- I would expect this to be problematic (cf. my comment below on 
`defineWithStores()`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
                 name,
                 new ProcessorParameters<>(processorSupplier, name),
-                stateStoreNames
+                getStoreNamesAndMaybeAddStores(processorSupplier, 
stateStoreNames)
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
+    /**
+     * Provides store names that should be connected to a {@link 
StatefulProcessorNode}, from two sources:
+     * 1) Store names are provided as arguments to process(...), 
transform(...), etc.
+     * 2) {@link StoreBuilder}s are provided by the 
Processor/TransformerSupplier itself, by returning a set from
+     * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s 
will also be added to the topology.
+     */
+    private String[] getStoreNamesAndMaybeAddStores(final 
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
       Why not make `varargsStoreNames` a vararg? The `null` check below is not 
required then?
   
   Why do we actually "unwrap" the stores here and add them one-by-one to the 
builder/topology?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -549,6 +593,14 @@ private Topology createStatefulTopology(final String 
storeName) {
             .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
+    private Topology createConnectedStateStoreTopology(final String storeName) 
{
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());

Review comment:
       nit: line too long

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
##########
@@ -55,61 +60,92 @@
     private final String topic = "stream";
     private final String stateStoreName = "myTransformState";
     private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
-    private final ForeachAction<Integer, Integer> action = (key, value) -> 
results.add(KeyValue.pair(key, value));
+    private final ForeachAction<Integer, Integer> accumulateExpected = (key, 
value) -> results.add(KeyValue.pair(key, value));
     private KStream<Integer, Integer> stream;
 
     @Before
     public void before() {
         builder = new StreamsBuilder();
-        final StoreBuilder<KeyValueStore<Integer, Integer>> 
keyValueStoreBuilder =
-                
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
-                                            Serdes.Integer(),
-                                            Serdes.Integer());
-        builder.addStateStore(keyValueStoreBuilder);
         stream = builder.stream(topic, Consumed.with(Serdes.Integer(), 
Serdes.Integer()));
     }
 
+    private StoreBuilder<KeyValueStore<Integer, Integer>> storeBuilder() {
+        return 
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.Integer());
+    }
+
     private void verifyResult(final List<KeyValue<Integer, Integer>> expected) 
{
         final ConsumerRecordFactory<Integer, Integer> recordFactory =
             new ConsumerRecordFactory<>(new IntegerSerializer(), new 
IntegerSerializer());
         final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(topic, Arrays.asList(new 
KeyValue<>(1, 1),
-                                                                       new 
KeyValue<>(2, 2),
-                                                                       new 
KeyValue<>(3, 3),
-                                                                       new 
KeyValue<>(2, 1),
-                                                                       new 
KeyValue<>(2, 3),
-                                                                       new 
KeyValue<>(1, 3))));
+                new KeyValue<>(2, 2),

Review comment:
       If you change the indention, move the first `new KeyValue(1, 1)` to it's 
own line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to