mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r430786602
##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>. Only such state
stores are available that (1) have been named in the
- corresponding <code class="docutils literal"><span
class="pre">KStream#process()</span></code> method call (note that this is a
different method than <code class="docutils literal"><span
class="pre">Processor#process()</span></code>),
- plus (2) all global stores. Note that global stores
do not need to be attached explicitly; however, they only
- allow for read-only access.</p>
+ calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
+ State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
+ There are two ways to connect state stores to a
processor:
+ <ul class="simple">
+ <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
+ <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+ passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code>
Review comment:
```suggestion
passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
```
##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>. Only such state
stores are available that (1) have been named in the
- corresponding <code class="docutils literal"><span
class="pre">KStream#process()</span></code> method call (note that this is a
different method than <code class="docutils literal"><span
class="pre">Processor#process()</span></code>),
- plus (2) all global stores. Note that global stores
do not need to be attached explicitly; however, they only
- allow for read-only access.</p>
+ calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
+ State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
+ There are two ways to connect state stores to a
processor:
+ <ul class="simple">
+ <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
+ <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+ passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code>
Review comment:
This is the DSL guide, so using `StreamsBuilder` seems appropriate.
##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String
name,
final ProcessorSupplier supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+ final Set<StoreBuilder<?>> stores = supplier.stores();
+ if (stores != null) {
+ for (final StoreBuilder storeBuilder : stores) {
+ internalTopologyBuilder.addStateStore(storeBuilder, name);
+ }
+ final String[] storeNames =
stores.stream().map(StoreBuilder::name).toArray(String[]::new);
+ internalTopologyBuilder.connectProcessorAndStateStores(name,
storeNames);
Review comment:
Why do we need this? Calling `addStateStore(storeBuilder, name)` should
connect the store to the processor already?
##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>. Only such state
stores are available that (1) have been named in the
- corresponding <code class="docutils literal"><span
class="pre">KStream#process()</span></code> method call (note that this is a
different method than <code class="docutils literal"><span
class="pre">Processor#process()</span></code>),
- plus (2) all global stores. Note that global stores
do not need to be attached explicitly; however, they only
- allow for read-only access.</p>
+ calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
+ State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
+ There are two ways to connect state stores to a
processor:
+ <ul class="simple">
+ <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
+ <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+ passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
+ beforehand; the store will be automatically added
for you.
+ </li>
Review comment:
Should we add a sentence, that `ConnectedStoreProvided` can also be used
for `TransformerSupplier` and `ValueTransformerSupplier`?
##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>. Only such state
stores are available that (1) have been named in the
- corresponding <code class="docutils literal"><span
class="pre">KStream#process()</span></code> method call (note that this is a
different method than <code class="docutils literal"><span
class="pre">Processor#process()</span></code>),
- plus (2) all global stores. Note that global stores
do not need to be attached explicitly; however, they only
- allow for read-only access.</p>
+ calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
+ State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
+ There are two ways to connect state stores to a
processor:
+ <ul class="simple">
+ <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
+ <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+ passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
+ beforehand; the store will be automatically added
for you.
+ </li>
Review comment:
Should we add a sentence, that `ConnectedStoreProvided` can also be used
for `TransformerSupplier` and `ValueTransformerSupplier` and
`ValueTransformerWithKeySupplier`?
##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Even though we do not demonstrate it in
this example, a stream processor can access any available state stores by
- calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>. Only such state
stores are available that (1) have been named in the
- corresponding <code class="docutils literal"><span
class="pre">KStream#process()</span></code> method call (note that this is a
different method than <code class="docutils literal"><span
class="pre">Processor#process()</span></code>),
- plus (2) all global stores. Note that global stores
do not need to be attached explicitly; however, they only
- allow for read-only access.</p>
+ calling <code class="docutils literal"><span
class="pre">ProcessorContext#getStateStore()</span></code>.
+ State stores are only available if they have been
connected to the processor, or if they are global stores. While global stores
do not need to be connected explicitly, they only allow for read-only access.
+ There are two ways to connect state stores to a
processor:
+ <ul class="simple">
+ <li>By passing the name of a store that has already
been added via <code class="docutils literal"><span
class="pre">Topology#addStateStore()</span></code> to the corresponding <code
class="docutils literal"><span class="pre">KStream#process()</span></code>
method call.</li>
+ <li>Implementing <code class="docutils literal"><span
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+ passed to <code class="docutils literal"><span
class="pre">KStream#process()</span></code>. In this case there is no need to
call <code class="docutils literal"><span
class="pre">StreamsBuilder#addStateStore()</span></code>
+ beforehand; the store will be automatically added
for you.
+ </li>
Review comment:
Should we add a sentence, that `ConnectedStoreProvided` can also be used
for `TransformerSupplier` et al.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]