vvcephei commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r427543403



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -285,7 +286,28 @@ public void shouldProcessViaThroughTopic() {
         assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", 
"aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
         assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", 
"aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
     }
-    
+
+    @Test
+    public void shouldProcessViaRepartitionTopic() {

Review comment:
       Thanks!

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -95,9 +95,11 @@ <h3><a id="streams_api_changes_260" 
href="#streams_api_changes_260">Streams API
         Note that you need brokers with version 2.5 or newer to use this 
feature.
     </p>
     <p>
-        As of 2.6.0 Kafka Streams offers a new 
<code>KStream.repartition()</code> operator (as per <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint";>KIP-221</a>).
+        As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if 
favor of the new <code>KStream.repartition()</code> operator
+        (as per <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint";>KIP-221</a>).
         <code>KStream.repartition()</code> is similar to 
<code>KStream.through()</code>, however Kafka Streams will manage the topic for 
you.
-        Refer to the <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer
 guide</a> for more details.
+        If you need to write into and read back from a topic you mange by your 
own, you can fall back to use <code>KStream.to()</code> in combination with 
<code>StreamsBuilder#stream()</code>.

Review comment:
       ```suggestion
           If you need to write into and read back from a topic that you mange, 
you can fall back to use <code>KStream.to()</code> in combination with 
<code>StreamsBuilder#stream()</code>.
   ```

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -1763,32 +1763,23 @@ <h4><a id="streams_concepts_globalktable" 
href="#streams_concepts_globalktable">
                                 streams/tables of a join &#8211; it is up to 
the user to ensure that this is the case.</p>
                         </div>
                         <p><strong>Ensuring data co-partitioning:</strong> If 
the inputs of a join are not co-partitioned yet, you must ensure this manually.
-                            You may follow a procedure such as outlined 
below.</p>
+                            You may follow a procedure such as outlined below.
+                            It is recommended to repartitiont to topic with 
fewers partitions to match the larger partition number of avoid bottlenecks.

Review comment:
       ```suggestion
                               It is recommended to repartition the topic with 
fewer partitions to match the larger partition number of avoid bottlenecks.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1082,7 +1081,7 @@ public void cleanUp() {
      * This will use the default Kafka Streams partitioner to locate the 
partition.
      * If a {@link StreamPartitioner custom partitioner} has been
      * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link 
StreamsConfig} or
-     * {@link KStream#through(String, Produced)}, or if the original {@link 
KTable}'s input
+     * {@link KStream#repartition(Repartitioned)}, or if the original {@link 
KTable}'s input

Review comment:
       Might as well make this update, since we may remove the methods at 
different times.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -265,6 +265,7 @@ public void shouldProcessingFromSinkTopic() {
                  processorSupplier.theCapturedProcessor().processed);
     }
 
+    @SuppressWarnings("deprecation")

Review comment:
       My opinion is that it's generally better not to suppress but instead 
just deprecate this method as well. It's not really that important for tests, 
since no one else is going to call the method, so feel free to take or leave 
the advice.

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -95,9 +95,11 @@ <h3><a id="streams_api_changes_260" 
href="#streams_api_changes_260">Streams API
         Note that you need brokers with version 2.5 or newer to use this 
feature.
     </p>
     <p>
-        As of 2.6.0 Kafka Streams offers a new 
<code>KStream.repartition()</code> operator (as per <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint";>KIP-221</a>).
+        As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if 
favor of the new <code>KStream.repartition()</code> operator
+        (as per <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint";>KIP-221</a>).
         <code>KStream.repartition()</code> is similar to 
<code>KStream.through()</code>, however Kafka Streams will manage the topic for 
you.
-        Refer to the <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer
 guide</a> for more details.
+        If you need to write into and read back from a topic you mange by your 
own, you can fall back to use <code>KStream.to()</code> in combination with 
<code>StreamsBuilder#stream()</code>.
+        We refer to the <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer
 guide</a> for more details about <code>KStream.repartition()</code>.

Review comment:
       ```suggestion
           Please refer to the <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer
 guide</a> for more details about <code>KStream.repartition()</code>.
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionedInternal.java
##########
@@ -21,33 +21,33 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 
-class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
+public class RepartitionedInternal<K, V> extends Repartitioned<K, V> {

Review comment:
       It's worth noting that it only needs to be visible for the scala _tests_ 
that verify the scala Repartitioned builder results in a correctly configured 
object. For the public API, we only convert a scala Repartitioned to a java 
Repartitioned.

##########
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param produced the instance of Produced that gives the serdes and 
`StreamPartitioner`
    * @return a [[KStream]] that contains the exact same (and potentially 
repartitioned) records as this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#through`
+   * @deprecated use `repartition()` instead
    */
+  @deprecated
   def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] 
=
     new KStream(inner.through(topic, produced))
 
+  /**
+   * Materialize this stream to a topic and creates a new [[KStream]] from the 
topic using the `Repartitioned` instance
+   * for configuration of the `Serde key serde`, `Serde value serde`, 
`StreamPartitioner`, number of partitions, and
+   * topic name part.
+   * <p>
+   * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
+   * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
+   * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
+   * `StreamsConfig` via parameter APPLICATION_ID_CONFIG 
APPLICATION_ID_CONFIG`,
+   * "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an 
internally
+   * generated name, and "-repartition" is a fixed suffix.
+   * <p>
+   * The user can either supply the `Repartitioned` instance as an implicit in 
scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Repartitioned` instance 
implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import Serdes._
+   *
+   * //..
+   * val clicksPerRegion: KStream[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, 
which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.repartition
+   *
+   * // Similarly you can create an implicit Repartitioned and it will be 
passed implicitly
+   * // to the repartition call
+   * }}}
+   *
+   * @param repartitioned the `Repartitioned` instance used to specify 
`Serdes`, `StreamPartitioner`` which determines
+   *                      how records are distributed among partitions of the 
topic,
+   *                      part of the topic name, and number of partitions for 
a repartition topic.
+   * @return a [[KStream]] that contains the exact same repartitioned records 
as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#repartition`
+   */
+  def repartition()(implicit repartitioned: Repartitioned[K, V]): KStream[K, 
V] =

Review comment:
       I think we'd prefer:
   ```suggestion
     def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, 
V] =
   ```
   similar to groupByKey, although I'm admittedly not sure if it actually 
matters.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -484,8 +493,14 @@ private Topology 
setupTopologyWithIntermediateUserTopic(final String outputTopic
             .toStream()
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
-        input.through(INTERMEDIATE_USER_TOPIC)
-            .groupByKey()
+        final KStream<Long, String> stream;
+        if (useRepartitioned) {
+            stream = input.repartition();
+        } else {
+            input.to(INTERMEDIATE_USER_TOPIC);
+            stream = builder.stream(INTERMEDIATE_USER_TOPIC);

Review comment:
       I'm wondering if we should continue testing with `through`, to ensure it 
continues to work. WDYT?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
##########
@@ -254,6 +253,7 @@ private Topology 
getTopologyWithChangingValuesAfterChangingKey(final String opti
 
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated 
variant

Review comment:
       This would be a case where I would advocate more strongly to deprecate 
_this_ method, to avoid accidentally "hiding" the deprecation from callers.

##########
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##########
@@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * import Serdes._
    *
    * //..
-   * val clicksPerRegion: KTable[String, Long] = //..
+   * val clicksPerRegion: KStream[String, Long] = //..

Review comment:
       Oops...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to