vvcephei commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427623832
########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala ########## @@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` + * @deprecated use `repartition()` instead */ + @deprecated Review comment: We should provide the arguments to the annotation to explain the deprecation. This gives better IDE integration than the ScalaDoc (which we won't need anymore): ```suggestion @deprecated("use `repartition()` instead", "2.6.0") ``` ########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala ########## @@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` + * @deprecated use `repartition()` instead */ + @deprecated def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] = new KStream(inner.through(topic, produced)) + /** + * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance + * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and + * topic name part. + * <p> + * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. + * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. + * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`, + * "<name>" is either provided via `Repartitioned#as(String)` or an internally + * generated name, and "-repartition" is a fixed suffix. + * <p> + * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit + * key and value serdes that will be converted to a `Repartitioned` instance implicitly. + * <p> + * {{{ + * Example: + * + * // brings implicit serdes in scope + * import Serdes._ + * + * //.. + * val clicksPerRegion: KStream[String, Long] = //.. + * + * // Implicit serdes in scope will generate an implicit Produced instance, which + * // will be passed automatically to the call of through below + * clicksPerRegion.repartition + * + * // Similarly you can create an implicit Repartitioned and it will be passed implicitly + * // to the repartition call + * }}} + * + * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines + * how records are distributed among partitions of the topic, + * part of the topic name, and number of partitions for a repartition topic. + * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#repartition` + */ + def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] = Review comment: I just noticed that we have no test for this operator (or for through). Should we add one? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org