mjsax commented on code in PR #18314:
URL: https://github.com/apache/kafka/pull/18314#discussion_r1896960916
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
Review Comment:
It seems switching from `KStream#transform()` to `KStream#process()` is the
easy part. I guess the more interesting question is, how to rewrite
`CustomTransformer` to `CustomProcessor()` ?
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic here
+ }
+
+ @Override
+ public void process(String key, String value) {
+ Iterable<KeyValue<String, String>> results =
customFlatTransformation(key, value);
+ for (KeyValue<String, String> result : results) {
+ context.forward(result.key, result.value);
Review Comment:
Needs an update
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic here
+ }
+
+ @Override
+ public void process(String key, String value) {
Review Comment:
```suggestion
public void process(Record<String, String> record) {
```
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
Review Comment:
New new `api.Processor` has 4 generic, key/value in/out.
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic here
+ }
+
+ @Override
+ public void process(String key, String value) {
+ Iterable<KeyValue<String, String>> results =
customFlatTransformation(key, value);
Review Comment:
I think it would be difficult for user to reason what
`customFlatTransformation` is?
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic here
+ }
+
+ @Override
+ public void process(String key, String value) {
+ Iterable<KeyValue<String, String>> results =
customFlatTransformation(key, value);
+ for (KeyValue<String, String> result : results) {
+ context.forward(result.key, result.value);
+ }
+ }
+
+ @Override
+ public void close() {
Review Comment:
Can be omitted as it's empty.
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic here
Review Comment:
Might be good to just fill in the code? We need to store a reference to
`context`
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
Review Comment:
The new `api.ProcessorContext` has `<KOut, VOut>` generic types.
##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -3243,6 +3244,177 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable
Foreign-Key
// `Processor` interface, see further down below.
.process(() -> new
PopularPageEmailAlert("[email protected]"));</code></pre>
</div>
+ <div class="section" id="migrating-from-transform-to-process">
+ <h2>
+ <a class="headerlink"
href="#migrating-from-transform-to-process" title="Permalink to this headline">
+ Migrating from transform to process
+ </a>
+ </h2>
+ <div>
+ <p>
+ As of Kafka 4.0, several deprecated methods in the Kafka
Streams API, such as <code>transform</code>,
+ <code>flatTransform</code>, <code>transformValues</code>,
and <code>flatTransformValues</code>, have
+ been removed. These methods have been replaced with the
more versatile <code>process</code> API. This
+ guide provides detailed steps for migrating existing code
to use the new <code>process</code> API and
+ explains the benefits of the changes.
+ </p>
+ <h3>Overview of Changes</h3>
+ <p>The following deprecated methods are no longer available in
Kafka Streams:</p>
+ <ul>
+ <li><code>KStream#transform</code></li>
+ <li><code>KStream#flatTransform</code></li>
+ <li><code>KStream#transformValues</code></li>
+ <li><code>KStream#flatTransformValues</code></li>
+ </ul>
+ <p>The <code>process</code> API now serves as a unified
replacement for all these methods. It simplifies the
+ API surface while maintaining support for both stateless
and stateful operations.</p>
+
+ <h3>Migration Steps</h3>
+
+ <h4>1. Migrate from <code>transform</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedStream = stream.transform(
+ () -> new CustomTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new CustomProcessor(),
+ "state-store"
+);</code></pre>
+
+ <h4>2. Migrate from <code>flatTransform</code> to
<code>process</code></h4>
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> flatTransformedStream = stream.flatTransform(
+ () -> new CustomFlatTransformer(),
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic here
+ }
+
+ @Override
+ public void process(String key, String value) {
+ Iterable<KeyValue<String, String>> results =
customFlatTransformation(key, value);
+ for (KeyValue<String, String> result : results) {
+ context.forward(result.key, result.value);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Cleanup logic here
+ }
+ },
+ "state-store"
+);</code></pre>
+
+ <h4>3. Migrate from <code>transformValues</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String, String> transformedValuesStream = stream.transformValues(
+ () -> new ValueTransformer<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic
+ }
+
+ @Override
+ public String transform(String value) {
+ return value.toUpperCase();
+ }
+
+ @Override
+ public void close() {
+ // Cleanup logic
+ }
+ },
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic
+ }
+
+ @Override
+ public void process(String key, String value) {
+ String transformedValue = value.toUpperCase();
+ context.forward(key, transformedValue);
+ }
+
+ @Override
+ public void close() {
+ // Cleanup logic
+ }
+ },
+ "state-store"
+);</code></pre>
+
+ <h4>4. Migrate from <code>flatTransformValues</code> to
<code>process</code></h4>
+
+ <pre class="line-numbers"><code class="language-java">// Before
+KStream<String> flatTransformedValuesStream = stream.flatTransformValues(
+ () -> new ValueTransformerWithKey<String, String, Iterable<String>>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic
+ }
+
+ @Override
+ public Iterable<String> transform(String readOnlyKey, String value) {
+ return Arrays.asList(value.split(","));
+ }
+
+ @Override
+ public void close() {
+ // Cleanup logic
+ }
+ },
+ "state-store"
+);
+
+// After
+KStream<String, String> processedStream = stream.process(
+ () -> new Processor<String, String>() {
+ @Override
+ public void init(ProcessorContext context) {
+ // Initialization logic
+ }
+
+ @Override
+ public void process(String key, String value) {
+ for (String transformedValue : value.split(",")) {
+ context.forward(key, transformedValue);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Cleanup logic
+ }
+ },
+ "state-store"
+);</code></pre>
+
+ <h3>Benefits of Migrating to <code>process</code></h3>
+ <ul>
+ <li><strong>Unified API:</strong> Consolidates multiple
methods into a single, versatile API.</li>
+ <li><strong>Flexibility:</strong> Simplifies the
implementation of custom processing logic.</li>
+ <li><strong>Future-Proof:</strong> Ensures compatibility
with the latest Kafka Streams releases.</li>
Review Comment:
I would add "Improved type-safty" that the "new PAPI" offers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]