bbejeck commented on code in PR #18813:
URL: https://github.com/apache/kafka/pull/18813#discussion_r1947234658


##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -2074,283 +2080,54 @@ <TableValue, VOut> KStream<K, VOut> join(final 
KTable<K, TableValue> table,
      * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one output for each input {@code KStream} record
+     * By default, {@code KStream} records are processed by performing a 
lookup for matching records in the
+     * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+     * This default implementation does not handle out-of-order records in 
either input of the join well.
+     * See {@link #leftJoin(KTable, ValueJoiner, Joined)} on how to configure 
a stream-table join to handle out-of-order
+     * data.
+     *
+     * <p>For more details, about co-partitioning requirements, 
(auto-)repartitioning, and more see
+     * {@link #join(KStream, ValueJoiner, JoinWindows)}.
+     *
+     * @return A {@code KStream} that contains join-records, one for each 
matched stream record plus one for each
+     *         non-matching stream record, with the corresponding key and a 
value computed by the given {@link ValueJoiner}.
+     *
      * @see #join(KTable, ValueJoiner)
-     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
      */
-    <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
-                                     final ValueJoiner<? super V, ? super VT, 
? extends VR> joiner);
+    <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+                                             final ValueJoiner<? super V, ? 
super VTable, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join with default
-     * serializers and deserializers.
-     * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, 
all records from this stream will produce an
-     * output record (cf. below).
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link KTable} the provided
-     * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
-     * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoinerWithKey}.
-     * The key of the result record is the same as for both joining input 
records.
-     * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,A,null)&gt;</td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * See {@link #leftJoin(KTable, ValueJoiner)}.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoinerWithKey} that computes the join 
result for a pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoinerWithKey}, one output for each input {@code KStream} 
record
-     * @see #join(KTable, ValueJoinerWithKey)
-     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+     * <p>Note that the key is read-only and must not be modified, as this can 
lead to corrupt partitioning.

Review Comment:
   worth adding `which would lead to unexpected results` or something similar?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -2074,283 +2080,54 @@ <TableValue, VOut> KStream<K, VOut> join(final 
KTable<K, TableValue> table,
      * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
      * </tr>
      * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one output for each input {@code KStream} record
+     * By default, {@code KStream} records are processed by performing a 
lookup for matching records in the
+     * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+     * This default implementation does not handle out-of-order records in 
either input of the join well.
+     * See {@link #leftJoin(KTable, ValueJoiner, Joined)} on how to configure 
a stream-table join to handle out-of-order
+     * data.
+     *
+     * <p>For more details, about co-partitioning requirements, 
(auto-)repartitioning, and more see
+     * {@link #join(KStream, ValueJoiner, JoinWindows)}.
+     *
+     * @return A {@code KStream} that contains join-records, one for each 
matched stream record plus one for each
+     *         non-matching stream record, with the corresponding key and a 
value computed by the given {@link ValueJoiner}.
+     *
      * @see #join(KTable, ValueJoiner)
-     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
      */
-    <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
-                                     final ValueJoiner<? super V, ? super VT, 
? extends VR> joiner);
+    <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+                                             final ValueJoiner<? super V, ? 
super VTable, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join with default
-     * serializers and deserializers.
-     * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, 
all records from this stream will produce an
-     * output record (cf. below).
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link KTable} the provided
-     * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
-     * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoinerWithKey}.
-     * The key of the result record is the same as for both joining input 
records.
-     * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,A,null)&gt;</td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * See {@link #leftJoin(KTable, ValueJoiner)}.
      *
-     * @param table  the {@link KTable} to be joined with this stream
-     * @param joiner a {@link ValueJoinerWithKey} that computes the join 
result for a pair of matching records
-     * @param <VT>   the value type of the table
-     * @param <VR>   the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoinerWithKey}, one output for each input {@code KStream} 
record
-     * @see #join(KTable, ValueJoinerWithKey)
-     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+     * <p>Note that the key is read-only and must not be modified, as this can 
lead to corrupt partitioning.
      */
-    <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
-                                     final ValueJoinerWithKey<? super K, ? 
super V, ? super VT, ? extends VR> joiner);
+    <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+                                             final ValueJoinerWithKey<? super 
K, ? super V, ? super VTable, ? extends VOut> joiner);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join with default
-     * serializers and deserializers.
-     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all 
records from this stream will produce an
-     * output record (cf. below).
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link KTable} the provided
-     * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
-     * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoiner}.
-     * The key of the result record is the same as for both joining input 
records.
-     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi-join.
+     * In contrast to {@link #leftJoin(KTable, ValueJoiner)}, but only if the 
used {@link KTable} is backed by a
+     * {@link org.apache.kafka.streams.state.VersionedKeyValueStore 
VersionedKeyValueStore}, the additional
+     * {@link Joined} parameter allows to specify a join grace-period, to 
handle out-of-order data gracefully.
      *
-     * @param table   the {@link KTable} to be joined with this stream
-     * @param joiner  a {@link ValueJoiner} that computes the join result for 
a pair of matching records
-     * @param joined  a {@link Joined} instance that defines the serdes to
-     *                be used to serialize/deserialize inputs and outputs of 
the joined streams
-     * @param <VT>    the value type of the table
-     * @param <VR>    the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoiner}, one output for each input {@code KStream} record
-     * @see #join(KTable, ValueJoiner, Joined)
-     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+     * <p>For details about left-stream-table-join semantics see {@link 
#leftJoin(KTable, ValueJoiner)}.
+     * For co-partitioning requirements, (auto-)repartitioning, and more see 
{@link #join(KTable, ValueJoiner)}.
+     * If you specify a grace-period to handle out-of-order data, see {@link 
#join(KTable, ValueJoiner, Joined)}.
      */
-    <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
-                                     final ValueJoiner<? super V, ? super VT, 
? extends VR> joiner,
-                                     final Joined<K, V, VT> joined);
+    <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+                                             final ValueJoiner<? super V, ? 
super VTable, ? extends VOut> joiner,
+                                             final Joined<K, V, VTable> 
joined);
 
     /**
-     * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join with default
-     * serializers and deserializers.
-     * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, 
all records from this stream will produce an
-     * output record (cf. below).
-     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
-     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
-     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
-     * {@link KTable} state.
-     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
-     * will not produce any result records.
-     * <p>
-     * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link KTable} the provided
-     * {@link ValueJoinerWithKey} will be called to compute a value (with 
arbitrary type) for the result record.
-     * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoinerWithKey}.
-     * The key of the result record is the same as for both joining input 
records.
-     * Note that the key is read-only and should not be modified, as this can 
lead to undefined behaviour.
-     * If an {@code KStream} input record value is {@code null} the record 
will not be included in the join
-     * operation and thus no output record will be added to the resulting 
{@code KStream}.
-     * <p>
-     * Example:
-     * <table border='1'>
-     * <tr>
-     * <th>KStream</th>
-     * <th>KTable</th>
-     * <th>state</th>
-     * <th>result</th>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:A&gt;</td>
-     * <td></td>
-     * <td></td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,A,null)&gt;</td>
-     * </tr>
-     * <tr>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td></td>
-     * </tr>
-     * <tr>
-     * <td>&lt;K1:C&gt;</td>
-     * <td></td>
-     * <td>&lt;K1:b&gt;</td>
-     * <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
-     * </tr>
-     * </table>
-     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
-     * partitions.
-     * If this is not the case, you would need to call {@link 
#repartition(Repartitioned)} for this {@code KStream}
-     * before doing the join, specifying the same number of partitions via 
{@link Repartitioned} parameter as the given
-     * {@link KTable}.
-     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
-     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
-     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
-     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
-     * The repartitioning topic will be named 
"${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated
-     * name, and "-repartition" is a fixed suffix.
-     * <p>
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     * <p>
-     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
-     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
-     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
-     * correctly on its key.
+     * See {@link #leftJoin(KTable, ValueJoiner, Joined)}.
      *
-     * @param table   the {@link KTable} to be joined with this stream
-     * @param joiner  a {@link ValueJoinerWithKey} that computes the join 
result for a pair of matching records
-     * @param joined  a {@link Joined} instance that defines the serdes to
-     *                be used to serialize/deserialize inputs and outputs of 
the joined streams
-     * @param <VT>    the value type of the table
-     * @param <VR>    the value type of the result stream
-     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
-     * {@link ValueJoinerWithKey}, one output for each input {@code KStream} 
record
-     * @see #join(KTable, ValueJoinerWithKey, Joined)
-     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+     * <p>Note that the key is read-only and must not be modified, as this can 
lead to corrupt partitioning.

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to