vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r665744745



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>

Review comment:
       The whole time handling situation is slightly different now that there's 
no implicit timestamp inheritance during `process()`. I just reframed this 
whole section to first document the role of the input and output type bounds, 
to second document the new Record class, and finally to slightly reframe the 
docs about what happens during punctuation.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <p>First, we need to implement a custom stream processor, 
<code class="docutils literal"><span 
class="pre">PopularPageEmailAlert</span></code>, that implements the <code 
class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A 
processor that sends an alert message about a popular page to a configurable 
email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, 
Void&gt; {

Review comment:
       Adding the forward type bound.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
                 times inside <code class="docutils literal"><span 
class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input 
partitions over all input topics have new data (with newer timestamps) 
available.
-                    If at least one partition does not have any new data 
available, stream-time will not be advanced and thus <code class="docutils 
literal"><span class="pre">punctuate()</span></code> will not be triggered if 
<code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams 
processes records.
+                  If there are no records to process, or if Streams is waiting 
for new records
+                  due to the <a class="reference internal" 
href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and 
<code class="docutils literal"><span class="pre">punctuate()</span></code> will 
not be triggered if <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       I happened to notice that the old docs here were outdated. Streams no 
longer plays those games with stream time. It's just computed as the max 
timestamp of any record processed by the task. But the key point still stands, 
that people should be aware that stream time doesn't advance unless we process 
records.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <p>First, we need to implement a custom stream processor, 
<code class="docutils literal"><span 
class="pre">PopularPageEmailAlert</span></code>, that implements the <code 
class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A 
processor that sends an alert message about a popular page to a configurable 
email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, 
Void&gt; {
 
   private final String emailAddress;
-  private ProcessorContext context;
+  private ProcessorContext&lt;Void, Void&gt; context;
 
   public PopularPageEmailAlert(String emailAddress) {
     this.emailAddress = emailAddress;
   }
 
   @Override
-  public void init(ProcessorContext context) {
+  public void init(ProcessorContext&lt;Void, Void&gt; context) {
     this.context = context;
 
     // Here you would perform any additional initializations such as setting 
up an email client.
   }
 
   @Override
-  void process(PageId pageId, Long count) {
+  void process(Record&lt;PageId, Long&gt; record) {
     // Here you would format and send the alert email.
     //
-    // In this specific example, you would be able to include information 
about the page&#39;s ID and its view count
-    // (because the class implements `Processor&lt;PageId, Long&gt;`).
+    // In this specific example, you would be able to include
+    // information about the page&#39;s ID and its view count
   }
 
   @Override
   void close() {
-    // Any code for clean up would go here.  This processor instance will not 
be used again after this call.
+    // Any code for clean up would go here, for example tearing down the email 
client and anything
+    // else you created in the init() method
+    // This processor instance will not be used again after this call.

Review comment:
       Expanded on this point a little, since we specifically said we might 
create the email client during init.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -428,12 +463,20 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
 builder.addSource("Source", "source-topic")
     // add the WordCountProcessor node which takes the source processor as its 
upstream processor.
     // the ProcessorSupplier provides the count store associated with the 
WordCountProcessor
-    .addProcessor("Process", new ProcessorSupplier&ltString, String&gt() {
-        public Processor&ltString, String&gt get() {
+    .addProcessor("Process", new ProcessorSupplier&lt;String, String, String, 
String&gt;() {
+        public Processor&lt;String, String, String, String&gt; get() {
             return new WordCountProcessor();
         }
-        public Set&ltStoreBuilder&lt?&gt&gt stores() {
-            return countStoreBuilder;
+
+        public Set&lt;StoreBuilder&lt;?&gt;&gt; stores() {
+            final StoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; 
countsStoreBuilder =
+                Stores
+                    .keyValueStoreBuilder(
+                        Stores.persistentKeyValueStore("Counts"),
+                        Serdes.String(),
+                        Serdes.Long()
+                    );
+            return Collections.singleton(countsStoreBuilder);

Review comment:
       For this example, it seems more appropriate to depict a self-contained 
store definition, rather than referencing an externally defined store builder, 
as in the other example.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3446,33 +3446,35 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <p>First, we need to implement a custom stream processor, 
<code class="docutils literal"><span 
class="pre">PopularPageEmailAlert</span></code>, that implements the <code 
class="docutils literal"><span class="pre">Processor</span></code>
                     interface:</p>
                 <pre class="line-numbers"><code class="language-java">// A 
processor that sends an alert message about a popular page to a configurable 
email address
-public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
+public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, 
Void&gt; {
 
   private final String emailAddress;
-  private ProcessorContext context;
+  private ProcessorContext&lt;Void, Void&gt; context;
 
   public PopularPageEmailAlert(String emailAddress) {
     this.emailAddress = emailAddress;
   }
 
   @Override
-  public void init(ProcessorContext context) {
+  public void init(ProcessorContext&lt;Void, Void&gt; context) {
     this.context = context;
 
     // Here you would perform any additional initializations such as setting 
up an email client.
   }
 
   @Override
-  void process(PageId pageId, Long count) {
+  void process(Record&lt;PageId, Long&gt; record) {
     // Here you would format and send the alert email.
     //
-    // In this specific example, you would be able to include information 
about the page&#39;s ID and its view count
-    // (because the class implements `Processor&lt;PageId, Long&gt;`).
+    // In this specific example, you would be able to include
+    // information about the page&#39;s ID and its view count

Review comment:
       Simplified the comment a little. It seemed misleading, since the reason 
you get the page id and view count is the processor's position in the topology, 
not the input type parameters. But it also doesn't seem like an important point 
to make at all, so I dropped it.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span 
class="pre">process()</span></code> method, upon each received record, split 
the value string into words, and update their counts into the state store (we 
will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span 
class="pre">punctuate()</span></code> method, iterate the local state store and 
send the aggregated counts to the downstream processor (we will talk about 
downstream processors later in this section), and commit the current stream 
state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() 
and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, 
timestamp -> {
+            try (final KeyValueIterator&lt;String, Integer&gt; iter = 
kvStore.all()) {
+                while (iter.hasNext()) {
+                    final KeyValue&lt;String, Integer&gt; entry = iter.next();
+                    context.forward(new Record&lt;&gt;(entry.key, 
entry.value.toString(), timestamp));
+                }
+            }
+        });
+        kvStore = context.getStateStore("Counts");
+    }
 
-      // schedule a punctuate() method every second based on stream-time
-      this.context.schedule(Duration.ofSeconds(1000), 
PunctuationType.STREAM_TIME, (timestamp) -&gt; {
-          KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
-          while (iter.hasNext()) {
-              KeyValue&lt;String, Long&gt; entry = iter.next();
-              context.forward(entry.key, entry.value.toString());
-          }
-          iter.close();
+    @Override
+    public void process(final Record&lt;String, String&gt; record) {

Review comment:
       This is kind of funny: the old example was actually missing the 
`process` method! I copied the implementation over from the 
WordCountProcessorDemo.

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3492,7 +3494,6 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                     </ul>
                 </div>
                 <p>Then we can leverage the <code class="docutils 
literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the 
DSL via <code class="docutils literal"><span 
class="pre">KStream#process</span></code>.</p>
-                <p>In Java 8+, using lambda expressions:</p>

Review comment:
       We dropped support for Java 7 a while ago, so I reckon we can just 
document Java 8 now. But I didn't want to go on a crusade through the docs 
either. I touched this section because it's related to my docs task.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span 
class="pre">process()</span></code> method, upon each received record, split 
the value string into words, and update their counts into the state store (we 
will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span 
class="pre">punctuate()</span></code> method, iterate the local state store and 
send the aggregated counts to the downstream processor (we will talk about 
downstream processors later in this section), and commit the current stream 
state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() 
and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String, String, String&gt; {

Review comment:
       Adding in the new output type bounds. The rest of the example is also 
updated to the new PAPI (type bounds on ProcessorContext, Record, etc.)

##########
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
##########
@@ -54,47 +52,46 @@
  * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data 
arriving in the output topic.
  */
 public final class WordCountProcessorDemo {
-
-    static class MyProcessorSupplier implements ProcessorSupplier<String, 
String, String, String> {
+    static class WordCountProcessor implements Processor<String, String, 
String, String> {

Review comment:
       This might be out of scope here, but I visited this class as a reference 
for my docs update. It seems more natural to explicitly define the processor 
and then use a lambda for the supplier than to explicitly define the supplier 
and use a lambda for the processor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to