vvcephei commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427543403
########## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ########## @@ -285,7 +286,28 @@ public void shouldProcessViaThroughTopic() { assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed); assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed); } - + + @Test + public void shouldProcessViaRepartitionTopic() { Review comment: Thanks! ########## File path: docs/streams/upgrade-guide.html ########## @@ -95,9 +95,11 @@ <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API Note that you need brokers with version 2.5 or newer to use this feature. </p> <p> - As of 2.6.0 Kafka Streams offers a new <code>KStream.repartition()</code> operator (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>). + As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if favor of the new <code>KStream.repartition()</code> operator + (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>). <code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you. - Refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details. + If you need to write into and read back from a topic you mange by your own, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>. Review comment: ```suggestion If you need to write into and read back from a topic that you mange, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>. ``` ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -1763,32 +1763,23 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable"> streams/tables of a join – it is up to the user to ensure that this is the case.</p> </div> <p><strong>Ensuring data co-partitioning:</strong> If the inputs of a join are not co-partitioned yet, you must ensure this manually. - You may follow a procedure such as outlined below.</p> + You may follow a procedure such as outlined below. + It is recommended to repartitiont to topic with fewers partitions to match the larger partition number of avoid bottlenecks. Review comment: ```suggestion It is recommended to repartition the topic with fewer partitions to match the larger partition number of avoid bottlenecks. ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1082,7 +1081,7 @@ public void cleanUp() { * This will use the default Kafka Streams partitioner to locate the partition. * If a {@link StreamPartitioner custom partitioner} has been * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or - * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input + * {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input Review comment: Might as well make this update, since we may remove the methods at different times. ########## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ########## @@ -265,6 +265,7 @@ public void shouldProcessingFromSinkTopic() { processorSupplier.theCapturedProcessor().processed); } + @SuppressWarnings("deprecation") Review comment: My opinion is that it's generally better not to suppress but instead just deprecate this method as well. It's not really that important for tests, since no one else is going to call the method, so feel free to take or leave the advice. ########## File path: docs/streams/upgrade-guide.html ########## @@ -95,9 +95,11 @@ <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API Note that you need brokers with version 2.5 or newer to use this feature. </p> <p> - As of 2.6.0 Kafka Streams offers a new <code>KStream.repartition()</code> operator (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>). + As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if favor of the new <code>KStream.repartition()</code> operator + (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>). <code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you. - Refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details. + If you need to write into and read back from a topic you mange by your own, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>. + We refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>. Review comment: ```suggestion Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>. ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionedInternal.java ########## @@ -21,33 +21,33 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; -class RepartitionedInternal<K, V> extends Repartitioned<K, V> { +public class RepartitionedInternal<K, V> extends Repartitioned<K, V> { Review comment: It's worth noting that it only needs to be visible for the scala _tests_ that verify the scala Repartitioned builder results in a correctly configured object. For the public API, we only convert a scala Repartitioned to a java Repartitioned. ########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala ########## @@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` + * @deprecated use `repartition()` instead */ + @deprecated def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] = new KStream(inner.through(topic, produced)) + /** + * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance + * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and + * topic name part. + * <p> + * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. + * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. + * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`, + * "<name>" is either provided via `Repartitioned#as(String)` or an internally + * generated name, and "-repartition" is a fixed suffix. + * <p> + * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit + * key and value serdes that will be converted to a `Repartitioned` instance implicitly. + * <p> + * {{{ + * Example: + * + * // brings implicit serdes in scope + * import Serdes._ + * + * //.. + * val clicksPerRegion: KStream[String, Long] = //.. + * + * // Implicit serdes in scope will generate an implicit Produced instance, which + * // will be passed automatically to the call of through below + * clicksPerRegion.repartition + * + * // Similarly you can create an implicit Repartitioned and it will be passed implicitly + * // to the repartition call + * }}} + * + * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines + * how records are distributed among partitions of the topic, + * part of the topic name, and number of partitions for a repartition topic. + * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#repartition` + */ + def repartition()(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] = Review comment: I think we'd prefer: ```suggestion def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] = ``` similar to groupByKey, although I'm admittedly not sure if it actually matters. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic .toStream() .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); - input.through(INTERMEDIATE_USER_TOPIC) - .groupByKey() + final KStream<Long, String> stream; + if (useRepartitioned) { + stream = input.repartition(); + } else { + input.to(INTERMEDIATE_USER_TOPIC); + stream = builder.stream(INTERMEDIATE_USER_TOPIC); Review comment: I'm wondering if we should continue testing with `through`, to ensure it continues to work. WDYT? ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java ########## @@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti } + @SuppressWarnings("deprecation") // specifically testing the deprecated variant Review comment: This would be a case where I would advocate more strongly to deprecate _this_ method, to avoid accidentally "hiding" the deprecation from callers. ########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala ########## @@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * import Serdes._ * * //.. - * val clicksPerRegion: KTable[String, Long] = //.. + * val clicksPerRegion: KStream[String, Long] = //.. Review comment: Oops... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org