mjsax commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r668331269



##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            to <code class="docutils literal"><span 
class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
+            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all 
(or if it only forwards
+            <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span 
class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common 
superclass, you will
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils 
literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a 
Kafka record:
+            the key, value, timestamp and headers. When forwarding records, 
you can use the
+            constructor to create a new <code class="docutils literal"><span 
class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to 
replace one of the
+            <code class="docutils literal"><span 
class="pre">Record</span></code>'s properties

Review comment:
       `properties` -> `fields` (or `elements`) ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            to <code class="docutils literal"><span 
class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
+            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all 
(or if it only forwards
+            <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span 
class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common 
superclass, you will
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils 
literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a 
Kafka record:

Review comment:
       `key` -> `main` (to avoid confusion with "record key") ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -108,8 +144,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
                 times inside <code class="docutils literal"><span 
class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
                 <p class="first admonition-title"><b>Attention</b></p>
-                <p class="last">Stream-time is only advanced if all input 
partitions over all input topics have new data (with newer timestamps) 
available.
-                    If at least one partition does not have any new data 
available, stream-time will not be advanced and thus <code class="docutils 
literal"><span class="pre">punctuate()</span></code> will not be triggered if 
<code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                <p class="last">Stream-time is only advanced when Streams 
processes records.
+                  If there are no records to process, or if Streams is waiting 
for new records
+                  due to the <a class="reference internal" 
href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
+                  configuration, then the stream time will not advance and 
<code class="docutils literal"><span class="pre">punctuate()</span></code> will 
not be triggered if <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.

Review comment:
       `stream time` -> `stream-time` 

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            to <code class="docutils literal"><span 
class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
+            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all 
(or if it only forwards
+            <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span 
class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common 
superclass, you will
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils 
literal"><span class="pre">Record&lt;K, V&gt;</span></code>
+            data class. This class gives you access to the key components of a 
Kafka record:
+            the key, value, timestamp and headers. When forwarding records, 
you can use the
+            constructor to create a new <code class="docutils literal"><span 
class="pre">Record</span></code>
+            from scratch, or you can use the convenience builder methods to 
replace one of the
+            <code class="docutils literal"><span 
class="pre">Record</span></code>'s properties
+            and copy over the rest. For example,
+            <code class="docutils literal"><span 
class="pre">inputRecord.withValue(newValue)</span></code>
+            would copy the key, timestamp, and headers from
+            <code class="docutils literal"><span 
class="pre">inputRecord</span></code> while
+            setting the output record's value to <code class="docutils 
literal"><span class="pre">newValue</span></code>.
+            Note that this does not mutate <code class="docutils 
literal"><span class="pre">inputRecord</span></code>,
+            but instead creates a shallow copy. Beware that this is only a 
shallow copy, so if you
+            plan to mutate the key, value, or headers elsewhere in the 
program, you will want to
+            create a deep copy of those fields yourself.
+          </p>
+            <p>
+              In addition to handling incoming records via
+              <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>,
+              you have the option to schedule periodic invocation (called 
"punctuation")
+              in your processor's <code class="docutils literal"><span 
class="pre">init()</span></code>

Review comment:
       You can also create a punctuation within `process()`.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -119,45 +157,42 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
                 <li>In the <code class="docutils literal"><span 
class="pre">process()</span></code> method, upon each received record, split 
the value string into words, and update their counts into the state store (we 
will talk about this later in this section).</li>
                 <li>In the <code class="docutils literal"><span 
class="pre">punctuate()</span></code> method, iterate the local state store and 
send the aggregated counts to the downstream processor (we will talk about 
downstream processors later in this section), and commit the current stream 
state.</li>
             </ul>
-            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String&gt; {
-
-  private ProcessorContext context;
-  private KeyValueStore&lt;String, Long&gt; kvStore;
-
-  @Override
-  @SuppressWarnings(&quot;unchecked&quot;)
-  public void init(ProcessorContext context) {
-      // keep the processor context locally because we need it in punctuate() 
and commit()
-      this.context = context;
+            <pre class="line-numbers"><code class="language-java">public class 
WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
+    private KeyValueStore&lt;String, Integer&gt; kvStore;
 
-      // retrieve the key-value store named &quot;Counts&quot;
-      kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
+    @Override
+    public void init(final ProcessorContext&lt;String, String> context) {
+        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, 
timestamp -> {

Review comment:
       `->` -> `-&gt;` ?

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -86,12 +86,48 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#
               <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
               <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
               <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
-            <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-              (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
-              (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-              Note, that <code class="docutils literal"><span 
class="pre">#forward()</span></code> also allows to change the default behavior 
by passing a custom timestamp for the output record.</p>
-            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
-                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+          <p>
+            The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface takes two sets of generic 
parameters:
+            <code class="docutils literal"><span class="pre">KIn, VIn, KOut, 
VOut</span></code>. These define the input and output types
+            that the processor implementation can handle. <code 
class="docutils literal"><span class="pre">KIn</span></code> and
+            <code class="docutils literal"><span class="pre">VIn</span></code> 
define the key and value types that will be passed
+            to <code class="docutils literal"><span 
class="pre">process()</span></code>.
+            Likewise, <code class="docutils literal"><span 
class="pre">KOut</span></code> and <code class="docutils literal"><span 
class="pre">VOut</span></code>
+            define the forwarded key and value types that <code 
class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            will accept. If your processor does not forward any records at all 
(or if it only forwards
+            <code class="docutils literal"><span 
class="pre">null</span></code> keys or values),
+            a best practice is to set the output generic type argument to
+            <code class="docutils literal"><span 
class="pre">Void</span></code>.
+            If it needs to forward multiple types that don't share a common 
superclass, you will
+            have to set the output generic type argument to <code 
class="docutils literal"><span class="pre">Object</span></code>.
+          </p>
+          <p>
+            Both the <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>
+            and the <code class="docutils literal"><span 
class="pre">ProcessorContext#forward()</span></code>
+            methods handle precords in the form of the <code class="docutils 
literal"><span class="pre">Record&lt;K, V&gt;</span></code>

Review comment:
       `precords` -> `records`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to