vvcephei commented on a change in pull request #10994: URL: https://github.com/apache/kafka/pull/10994#discussion_r665744745
########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="# <code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single <code class="docutils literal"><span class="pre">Processor</span></code> object by calling <code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p> - <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: - (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp. - (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). - Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p> - <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code> - API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used + <p> Review comment: The whole time handling situation is slightly different now that there's no implicit timestamp inheritance during `process()`. I just reframed this whole section to first document the role of the input and output type bounds, to second document the new Record class, and finally to slightly reframe the docs about what happens during punctuation. ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code> interface:</p> <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address -public class PopularPageEmailAlert implements Processor<PageId, Long> { +public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> { Review comment: Adding the forward type bound. ########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="# times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p> <div class="admonition attention"> <p class="first admonition-title"><b>Attention</b></p> - <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. - If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. + <p class="last">Stream-time is only advanced when Streams processes records. + If there are no records to process, or if Streams is waiting for new records + due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a> + configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified. Review comment: I happened to notice that the old docs here were outdated. Streams no longer plays those games with stream time. It's just computed as the max timestamp of any record processed by the task. But the key point still stands, that people should be aware that stream time doesn't advance unless we process records. ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code> interface:</p> <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address -public class PopularPageEmailAlert implements Processor<PageId, Long> { +public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> { private final String emailAddress; - private ProcessorContext context; + private ProcessorContext<Void, Void> context; public PopularPageEmailAlert(String emailAddress) { this.emailAddress = emailAddress; } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext<Void, Void> context) { this.context = context; // Here you would perform any additional initializations such as setting up an email client. } @Override - void process(PageId pageId, Long count) { + void process(Record<PageId, Long> record) { // Here you would format and send the alert email. // - // In this specific example, you would be able to include information about the page's ID and its view count - // (because the class implements `Processor<PageId, Long>`). + // In this specific example, you would be able to include + // information about the page's ID and its view count } @Override void close() { - // Any code for clean up would go here. This processor instance will not be used again after this call. + // Any code for clean up would go here, for example tearing down the email client and anything + // else you created in the init() method + // This processor instance will not be used again after this call. Review comment: Expanded on this point a little, since we specifically said we might create the email client during init. ########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -428,12 +463,20 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a builder.addSource("Source", "source-topic") // add the WordCountProcessor node which takes the source processor as its upstream processor. // the ProcessorSupplier provides the count store associated with the WordCountProcessor - .addProcessor("Process", new ProcessorSupplier<String, String>() { - public Processor<String, String> get() { + .addProcessor("Process", new ProcessorSupplier<String, String, String, String>() { + public Processor<String, String, String, String> get() { return new WordCountProcessor(); } - public Set<StoreBuilder<?>> stores() { - return countStoreBuilder; + + public Set<StoreBuilder<?>> stores() { + final StoreBuilder<KeyValueStore<String, Long>> countsStoreBuilder = + Stores + .keyValueStoreBuilder( + Stores.persistentKeyValueStore("Counts"), + Serdes.String(), + Serdes.Long() + ); + return Collections.singleton(countsStoreBuilder); Review comment: For this example, it seems more appropriate to depict a self-contained store definition, rather than referencing an externally defined store builder, as in the other example. ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code> interface:</p> <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address -public class PopularPageEmailAlert implements Processor<PageId, Long> { +public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> { private final String emailAddress; - private ProcessorContext context; + private ProcessorContext<Void, Void> context; public PopularPageEmailAlert(String emailAddress) { this.emailAddress = emailAddress; } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext<Void, Void> context) { this.context = context; // Here you would perform any additional initializations such as setting up an email client. } @Override - void process(PageId pageId, Long count) { + void process(Record<PageId, Long> record) { // Here you would format and send the alert email. // - // In this specific example, you would be able to include information about the page's ID and its view count - // (because the class implements `Processor<PageId, Long>`). + // In this specific example, you would be able to include + // information about the page's ID and its view count Review comment: Simplified the comment a little. It seemed misleading, since the reason you get the page id and view count is the processor's position in the topology, not the input type parameters. But it also doesn't seem like an important point to make at all, so I dropped it. ########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="# <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li> </ul> - <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor<String, String> { - - private ProcessorContext context; - private KeyValueStore<String, Long> kvStore; - - @Override - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - // keep the processor context locally because we need it in punctuate() and commit() - this.context = context; + <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor<String, String, String, String> { + private KeyValueStore<String, Integer> kvStore; - // retrieve the key-value store named "Counts" - kvStore = (KeyValueStore) context.getStateStore("Counts"); + @Override + public void init(final ProcessorContext<String, String> context) { + context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { + try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { + while (iter.hasNext()) { + final KeyValue<String, Integer> entry = iter.next(); + context.forward(new Record<>(entry.key, entry.value.toString(), timestamp)); + } + } + }); + kvStore = context.getStateStore("Counts"); + } - // schedule a punctuate() method every second based on stream-time - this.context.schedule(Duration.ofSeconds(1000), PunctuationType.STREAM_TIME, (timestamp) -> { - KeyValueIterator<String, Long> iter = this.kvStore.all(); - while (iter.hasNext()) { - KeyValue<String, Long> entry = iter.next(); - context.forward(entry.key, entry.value.toString()); - } - iter.close(); + @Override + public void process(final Record<String, String> record) { Review comment: This is kind of funny: the old example was actually missing the `process` method! I copied the implementation over from the WordCountProcessorDemo. ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3492,7 +3494,6 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key </ul> </div> <p>Then we can leverage the <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the DSL via <code class="docutils literal"><span class="pre">KStream#process</span></code>.</p> - <p>In Java 8+, using lambda expressions:</p> Review comment: We dropped support for Java 7 a while ago, so I reckon we can just document Java 8 now. But I didn't want to go on a crusade through the docs either. I touched this section because it's related to my docs task. ########## File path: docs/streams/developer-guide/processor-api.html ########## @@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="# <li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li> <li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li> </ul> - <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor<String, String> { - - private ProcessorContext context; - private KeyValueStore<String, Long> kvStore; - - @Override - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - // keep the processor context locally because we need it in punctuate() and commit() - this.context = context; + <pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor<String, String, String, String> { Review comment: Adding in the new output type bounds. The rest of the example is also updated to the new PAPI (type bounds on ProcessorContext, Record, etc.) ########## File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ########## @@ -54,47 +52,46 @@ * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic. */ public final class WordCountProcessorDemo { - - static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> { + static class WordCountProcessor implements Processor<String, String, String, String> { Review comment: This might be out of scope here, but I visited this class as a reference for my docs update. It seems more natural to explicitly define the processor and then use a lambda for the supplier than to explicitly define the supplier and use a lambda for the processor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org