mjsax commented on code in PR #18703: URL: https://github.com/apache/kafka/pull/18703#discussion_r1929493758
########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -62,92 +62,59 @@ public interface KStream<K, V> { /** * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate. * All records that do not satisfy the predicate are dropped. - * This is a stateless record-by-record operation. + * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} + * for stateful record processing). + * + * @param predicate + * a filter {@link Predicate} that is applied to each record + * + * @return A {@code KStream} that contains only those records that satisfy the given predicate. * - * @param predicate a filter {@link Predicate} that is applied to each record - * @return a {@code KStream} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate) */ KStream<K, V> filter(final Predicate<? super K, ? super V> predicate); /** - * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate. - * All records that do not satisfy the predicate are dropped. - * This is a stateless record-by-record operation. + * See {@link #filter(Predicate)}. * - * @param predicate a filter {@link Predicate} that is applied to each record - * @param named a {@link Named} config used to name the processor in the topology - * @return a {@code KStream} that contains only those records that satisfy the given predicate - * @see #filterNot(Predicate) + * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named); /** * Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given * predicate. * All records that <em>do</em> satisfy the predicate are dropped. - * This is a stateless record-by-record operation. + * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} + * for stateful record processing). + * + * @param predicate + * a filter {@link Predicate} that is applied to each record + * + * @return A {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate. * - * @param predicate a filter {@link Predicate} that is applied to each record - * @return a {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate * @see #filter(Predicate) */ KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate); /** - * Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given - * predicate. - * All records that <em>do</em> satisfy the predicate are dropped. - * This is a stateless record-by-record operation. + * See {@link #filterNot(Predicate)}. * - * @param predicate a filter {@link Predicate} that is applied to each record - * @param named a {@link Named} config used to name the processor in the topology - * @return a {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate - * @see #filter(Predicate) + * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named); /** - * Set a new key (with possibly new type) for each input record. - * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it. + * Create a new {@code KStream} that consists of all records of this stream but with a modified key. + * The provided {@link KeyValueMapper} is applied to each input record and computes a new key (possibly of a + * different type) for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}. - * This is a stateless record-by-record operation. - * <p> - * For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} by - * extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the - * length of the value string. - * <pre>{@code - * KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic"); - * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> { - * Integer apply(Byte[] key, String value) { - * return value.length(); - * } - * }); - * }</pre> - * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or - * join) is applied to the result {@code KStream}. + * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for + * stateful record processing). * - * @param mapper a {@link KeyValueMapper} that computes a new key for each record - * @param <KR> the new key type of the result stream - * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - */ - <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper); Review Comment: This is not removed... Only changed `<KR>` to `<KOut>` additionally, what make the diff bad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org