lucasbru commented on code in PR #18760:
URL: https://github.com/apache/kafka/pull/18760#discussion_r1940875377


##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -758,185 +756,118 @@ <VR> KStream<K, VR> flatMapValues(final 
ValueMapperWithKey<? super K, ? super V,
 
     /**
      * Materialize this stream to an auto-generated repartition topic and 
create a new {@code KStream}
-     * from the auto-generated topic using default serializers, deserializers, 
and producer's default partitioning strategy.
-     * The number of partitions is determined based on the upstream topics 
partition numbers.
-     * <p>
-     * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
-     * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
-     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * from the auto-generated topic.
+     *
+     * <p>The created topic is considered an internal topic and is meant to be 
used only by the current
+     * Kafka Streams instance.
+     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic is determined based 
on the upstream topics partition numbers.
+     * Furthermore, the topic will be created with infinite retention time and 
data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To explicitly set key/value serdes, specify the number of used 
partitions or the partitioning strategy,
+     * or to customize the name of the repartition topic, use {@link 
#repartition(Repartitioned)}.
      *
-     * @return {@code KStream} that contains the exact same repartitioned 
records as this {@code KStream}.
+     * @return A {@code KStream} that contains the exact same, but 
repartitioned records as this {@code KStream}.
      */
     KStream<K, V> repartition();
 
     /**
-     * Materialize this stream to an auto-generated repartition topic and 
create a new {@code KStream}
-     * from the auto-generated topic using {@link Serde key serde}, {@link 
Serde value serde}, {@link StreamPartitioner},
-     * number of partitions, and topic name part as defined by {@link 
Repartitioned}.
-     * <p>
-     * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
-     * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
-     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
-     * "&lt;name&gt;" is either provided via {@link Repartitioned#as(String)} 
or an internally
-     * generated name, and "-repartition" is a fixed suffix.
-     *
-     * @param repartitioned the {@link Repartitioned} instance used to specify 
{@link Serdes},
-     *                      {@link StreamPartitioner} which determines how 
records are distributed among partitions of the topic,
-     *                      part of the topic name, and number of partitions 
for a repartition topic.
-     * @return a {@code KStream} that contains the exact same repartitioned 
records as this {@code KStream}.
+     * See {@link #repartition()}.
      */
     KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);
 
     /**
-     * Materialize this stream to a topic using default serializers specified 
in the config and producer's
-     * default partitioning strategy.
-     * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
+     * Materialize this stream to a topic.
+     * The topic should be manually created before it is used (i.e., before 
the Kafka Streams application is
      * started).
      *
-     * @param topic the topic name
+     * <p>To explicitly set key/value serdes or the partitioning strategy, use 
{@link #to(String, Produced)}.
+     *
+     * @param topic
+     *        the output topic name
+     * 
+     * @see #to(TopicNameExtractor)
      */
     void to(final String topic);
 
     /**
-     * Materialize this stream to a topic using the provided {@link Produced} 
instance.
-     * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
-     * started).
-     *
-     * @param topic       the topic name
-     * @param produced    the options to use when producing to the topic
+     * See {@link #to(String).}
      */
     void to(final String topic,
             final Produced<K, V> produced);
 
     /**
-     * Dynamically materialize this stream to topics using default serializers 
specified in the config and producer's
-     * default partitioning strategy.
-     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
+     * Materialize the record of this stream to different topics.
+     * The provided {@link TopicNameExtractor} is applied to each input record 
to compute the output topic name.
+     * All topics should be manually created before they are use (i.e., before 
the Kafka Streams application is started).

Review Comment:
   ```suggestion
        * All topics should be manually created before they are used (i.e., 
before the Kafka Streams application is started).
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -758,185 +756,118 @@ <VR> KStream<K, VR> flatMapValues(final 
ValueMapperWithKey<? super K, ? super V,
 
     /**
      * Materialize this stream to an auto-generated repartition topic and 
create a new {@code KStream}
-     * from the auto-generated topic using default serializers, deserializers, 
and producer's default partitioning strategy.
-     * The number of partitions is determined based on the upstream topics 
partition numbers.
-     * <p>
-     * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
-     * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
-     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * from the auto-generated topic.
+     *
+     * <p>The created topic is considered an internal topic and is meant to be 
used only by the current
+     * Kafka Streams instance.
+     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic is determined based 
on the upstream topics partition numbers.
+     * Furthermore, the topic will be created with infinite retention time and 
data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To explicitly set key/value serdes, specify the number of used 
partitions or the partitioning strategy,
+     * or to customize the name of the repartition topic, use {@link 
#repartition(Repartitioned)}.
      *
-     * @return {@code KStream} that contains the exact same repartitioned 
records as this {@code KStream}.
+     * @return A {@code KStream} that contains the exact same, but 
repartitioned records as this {@code KStream}.
      */
     KStream<K, V> repartition();
 
     /**
-     * Materialize this stream to an auto-generated repartition topic and 
create a new {@code KStream}
-     * from the auto-generated topic using {@link Serde key serde}, {@link 
Serde value serde}, {@link StreamPartitioner},
-     * number of partitions, and topic name part as defined by {@link 
Repartitioned}.
-     * <p>
-     * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
-     * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
-     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
-     * "&lt;name&gt;" is either provided via {@link Repartitioned#as(String)} 
or an internally
-     * generated name, and "-repartition" is a fixed suffix.
-     *
-     * @param repartitioned the {@link Repartitioned} instance used to specify 
{@link Serdes},
-     *                      {@link StreamPartitioner} which determines how 
records are distributed among partitions of the topic,
-     *                      part of the topic name, and number of partitions 
for a repartition topic.
-     * @return a {@code KStream} that contains the exact same repartitioned 
records as this {@code KStream}.
+     * See {@link #repartition()}.
      */
     KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);
 
     /**
-     * Materialize this stream to a topic using default serializers specified 
in the config and producer's
-     * default partitioning strategy.
-     * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
+     * Materialize this stream to a topic.
+     * The topic should be manually created before it is used (i.e., before 
the Kafka Streams application is
      * started).
      *
-     * @param topic the topic name
+     * <p>To explicitly set key/value serdes or the partitioning strategy, use 
{@link #to(String, Produced)}.
+     *
+     * @param topic
+     *        the output topic name
+     * 
+     * @see #to(TopicNameExtractor)
      */
     void to(final String topic);
 
     /**
-     * Materialize this stream to a topic using the provided {@link Produced} 
instance.
-     * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
-     * started).
-     *
-     * @param topic       the topic name
-     * @param produced    the options to use when producing to the topic
+     * See {@link #to(String).}
      */
     void to(final String topic,
             final Produced<K, V> produced);
 
     /**
-     * Dynamically materialize this stream to topics using default serializers 
specified in the config and producer's
-     * default partitioning strategy.
-     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
+     * Materialize the record of this stream to different topics.
+     * The provided {@link TopicNameExtractor} is applied to each input record 
to compute the output topic name.
+     * All topics should be manually created before they are use (i.e., before 
the Kafka Streams application is started).
+     *
+     * <p>To explicitly set key/value serdes or the partitioning strategy, use 
{@link #to(TopicNameExtractor, Produced)}.
      *
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to write to for each record
+     * @param topicExtractor
+     *        the extractor to determine the name of the Kafka topic to write 
to for each record
      */
     void to(final TopicNameExtractor<K, V> topicExtractor);
 
     /**
-     * Dynamically materialize this stream to topics using the provided {@link 
Produced} instance.
-     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
-     *
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to write to for each record
-     * @param produced          the options to use when producing to the topic
+     * See {@link #to(TopicNameExtractor)}.

Review Comment:
   The parameter `produced` is not explained anywhere now. Same for the other 
methods. Would it make sense to refer from most generic to the most specific 
overload instead?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -758,185 +756,118 @@ <VR> KStream<K, VR> flatMapValues(final 
ValueMapperWithKey<? super K, ? super V,
 
     /**
      * Materialize this stream to an auto-generated repartition topic and 
create a new {@code KStream}
-     * from the auto-generated topic using default serializers, deserializers, 
and producer's default partitioning strategy.
-     * The number of partitions is determined based on the upstream topics 
partition numbers.
-     * <p>
-     * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
-     * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
-     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * from the auto-generated topic.
+     *
+     * <p>The created topic is considered an internal topic and is meant to be 
used only by the current
+     * Kafka Streams instance.
+     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
+     * The number of partitions for the repartition topic is determined based 
on the upstream topics partition numbers.
+     * Furthermore, the topic will be created with infinite retention time and 
data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * To explicitly set key/value serdes, specify the number of used 
partitions or the partitioning strategy,
+     * or to customize the name of the repartition topic, use {@link 
#repartition(Repartitioned)}.
      *
-     * @return {@code KStream} that contains the exact same repartitioned 
records as this {@code KStream}.
+     * @return A {@code KStream} that contains the exact same, but 
repartitioned records as this {@code KStream}.
      */
     KStream<K, V> repartition();
 
     /**
-     * Materialize this stream to an auto-generated repartition topic and 
create a new {@code KStream}
-     * from the auto-generated topic using {@link Serde key serde}, {@link 
Serde value serde}, {@link StreamPartitioner},
-     * number of partitions, and topic name part as defined by {@link 
Repartitioned}.
-     * <p>
-     * The created topic is considered as an internal topic and is meant to be 
used only by the current Kafka Streams instance.
-     * Similar to auto-repartitioning, the topic will be created with infinite 
retention time and data will be automatically purged by Kafka Streams.
-     * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
-     * "&lt;name&gt;" is either provided via {@link Repartitioned#as(String)} 
or an internally
-     * generated name, and "-repartition" is a fixed suffix.
-     *
-     * @param repartitioned the {@link Repartitioned} instance used to specify 
{@link Serdes},
-     *                      {@link StreamPartitioner} which determines how 
records are distributed among partitions of the topic,
-     *                      part of the topic name, and number of partitions 
for a repartition topic.
-     * @return a {@code KStream} that contains the exact same repartitioned 
records as this {@code KStream}.
+     * See {@link #repartition()}.
      */
     KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);
 
     /**
-     * Materialize this stream to a topic using default serializers specified 
in the config and producer's
-     * default partitioning strategy.
-     * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
+     * Materialize this stream to a topic.
+     * The topic should be manually created before it is used (i.e., before 
the Kafka Streams application is
      * started).
      *
-     * @param topic the topic name
+     * <p>To explicitly set key/value serdes or the partitioning strategy, use 
{@link #to(String, Produced)}.
+     *
+     * @param topic
+     *        the output topic name
+     * 
+     * @see #to(TopicNameExtractor)
      */
     void to(final String topic);
 
     /**
-     * Materialize this stream to a topic using the provided {@link Produced} 
instance.
-     * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
-     * started).
-     *
-     * @param topic       the topic name
-     * @param produced    the options to use when producing to the topic
+     * See {@link #to(String).}
      */
     void to(final String topic,
             final Produced<K, V> produced);
 
     /**
-     * Dynamically materialize this stream to topics using default serializers 
specified in the config and producer's
-     * default partitioning strategy.
-     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
+     * Materialize the record of this stream to different topics.
+     * The provided {@link TopicNameExtractor} is applied to each input record 
to compute the output topic name.
+     * All topics should be manually created before they are use (i.e., before 
the Kafka Streams application is started).
+     *
+     * <p>To explicitly set key/value serdes or the partitioning strategy, use 
{@link #to(TopicNameExtractor, Produced)}.
      *
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to write to for each record
+     * @param topicExtractor
+     *        the extractor to determine the name of the Kafka topic to write 
to for each record
      */
     void to(final TopicNameExtractor<K, V> topicExtractor);
 
     /**
-     * Dynamically materialize this stream to topics using the provided {@link 
Produced} instance.
-     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
-     *
-     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to write to for each record
-     * @param produced          the options to use when producing to the topic
+     * See {@link #to(TopicNameExtractor)}.
      */
     void to(final TopicNameExtractor<K, V> topicExtractor,
             final Produced<K, V> produced);
 
     /**
      * Convert this stream to a {@link KTable}.
-     * <p>
-     * If a key changing operator was used before this operation (e.g., {@link 
#selectKey(KeyValueMapper)},
-     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
-     * {@link #process(ProcessorSupplier, String...)}) an internal 
repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", 
where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * The conversion is a logical operation and only changes the 
"interpretation" of the records, i.e., each record of
+     * this stream is a "fact/event" and is re-interpreted as a 
"change/update-per-key" now
+     * (cf. {@link KStream} vs {@link KTable}). The resulting {@link KTable} 
is essentially a changelog stream.
+     * To "upsert" the records of this stream into a materialized {@link 
KTable} (i.e., into a state store),
+     * use {@link #toTable(Materialized)}.
+     *
+     * <p>Note that {@code null} keys are not supported by {@code KTables} and 
records with {@code null} key will be dropped.
+     *
+     * <p>If a key changing operator was used before this operation (e.g., 
{@link #selectKey(KeyValueMapper)},
+     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or 
{@link #process(ProcessorSupplier, String...)})
+     * Kafka Streams will automatically repartition the data, i.e., it will 
create an internal repartitioning topic in
+     * Kafka and write and re-read the data via this topic such that the 
resulting {@link KTable} is correctly
+     * partitioned by its key.
+     *
+     * <p>This internal repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition",
+     * where "applicationId" is user-specified in {@link StreamsConfig} via 
parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "&lt;name&gt;" is an internally generated name, and "-repartition" is a 
fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * For this case, all data of this stream will be redistributed through 
the repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the 
resulting {@link KTable} is partitioned
-     * correctly on its key.
-     * Note that you cannot enable {@link 
StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
-     * repartition topics are considered transient and don't allow to recover 
the result {@link KTable} in cause of
+     * The number of partitions for the repartition topic is determined based 
on the upstream topics partition numbers.
+     * Furthermore, the topic will be created with infinite retention time and 
data will be automatically purged
+     * by Kafka Streams.
+     *
+     * <p>Note: If the result {@link KTable} is materialized, it is not 
possible to apply
+     * {@link StreamsConfig#REUSE_KTABLE_SOURCE_TOPICS "source topic 
optimization"}, because

Review Comment:
   I'm not aware of this syntax. Do you know / have you checked that it works?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to