[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427752328 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java ## @@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti } +@SuppressWarnings("deprecation") // specifically testing the deprecated variant Review comment: Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427674857 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala ## @@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` + * @deprecated use `repartition()` instead */ + @deprecated def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] = new KStream(inner.through(topic, produced)) + /** + * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance + * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and + * topic name part. + * + * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. + * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. + * The topic will be named as "${applicationId}-name-repartition", where "applicationId" is user-specified in + * `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`, + * "name" is either provided via `Repartitioned#as(String)` or an internally + * generated name, and "-repartition" is a fixed suffix. + * + * The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit + * key and value serdes that will be converted to a `Repartitioned` instance implicitly. + * + * {{{ + * Example: + * + * // brings implicit serdes in scope + * import Serdes._ + * + * //.. + * val clicksPerRegion: KStream[String, Long] = //.. + * + * // Implicit serdes in scope will generate an implicit Produced instance, which + * // will be passed automatically to the call of through below + * clicksPerRegion.repartition + * + * // Similarly you can create an implicit Repartitioned and it will be passed implicitly + * // to the repartition call + * }}} + * + * @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines + * how records are distributed among partitions of the topic, + * part of the topic name, and number of partitions for a repartition topic. + * @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#repartition` + */ + def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] = Review comment: Not sure what we can/should test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427581563 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java ## @@ -254,6 +253,7 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti } +@SuppressWarnings("deprecation") // specifically testing the deprecated variant Review comment: Well, but then we need to add more suppression or deprecation upstream. Does not seem worth for testing code This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427579474 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -815,9 +815,10 @@ * * @param topic the topic name * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} - * @see #repartition() - * @see #repartition(Repartitioned) + * @deprecated used {@link #repartition()} instead Review comment: Not sure why? If I use 2.6 why do I can if it was deprecated in 2.4 or 2.2 or 2.6? It's deprecated in the version I use now. Why would I care about older versions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r427578311 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic .toStream() .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); -input.through(INTERMEDIATE_USER_TOPIC) -.groupByKey() +final KStream stream; +if (useRepartitioned) { +stream = input.repartition(); +} else { +input.to(INTERMEDIATE_USER_TOPIC); +stream = builder.stream(INTERMEDIATE_USER_TOPIC); Review comment: Well, `through()` is literally implemented as `to()` + `stream()`... But I can revert and add a suppress annotation, too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211425 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -846,16 +847,13 @@ * from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}. * The number of partitions is determined based on the upstream topics partition numbers. * - * This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically. Review comment: Not 100% sure if we should remove this now, or when we remove `through()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426214228 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala.kstream + +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ} +import org.apache.kafka.streams.processor.StreamPartitioner + +object Repartitioned { + + /** + * Create a Repartitioned instance with provided keySerde and valueSerde. + * + * @tparam K key type + * @tparam V value type + * @param keySerdeSerde to use for serializing the key + * @param valueSerde Serde to use for serializing the value + * @return A new [[Repartitioned]] instance configured with keySerde and valueSerde + * @see KStream#repartition(Repartitioned) + */ + def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] = Review comment: I just named all method `with` in alignment to the other Scala helper classes. Also noticed, that all helper classed only have static methods... Is not by design? Seems we are missing something here? If there is more than one optional parameter, it seems we should have non-static method to allow method chaining? (Could be fixed in a follow up PR) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426212066 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala ## @@ -37,15 +37,15 @@ class ProducedTest extends FlatSpec with Matchers { internalProduced.valueSerde.getClass shouldBe Serdes.Long.getClass } - "Create a Produced with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in { + "Create a Produced with streamPartitioner" should "create a Produced with Serdes and streamPartitioner" in { Review comment: Side cleanup (was originally copied from `ConsumedTest` but not updated correctly) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211984 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala ## @@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * import Serdes._ * * //.. - * val clicksPerRegion: KTable[String, Long] = //.. + * val clicksPerRegion: KStream[String, Long] = //.. Review comment: There is no `KTable#through()` method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211787 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java ## @@ -132,7 +132,8 @@ private KafkaStreams createKafkaStreams(final Properties props) { .to("sum", Produced.with(stringSerde, longSerde)); if (withRepartitioning) { -final KStream repartitionedData = data.through("repartition"); +data.to("repartition"); +final KStream repartitionedData = builder.stream("repartition"); Review comment: As above. Avoid internal topics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211719 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ## @@ -1393,6 +1409,11 @@ public void shouldPreserveSerdesForOperators() { assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde); +assertEquals(((AbstractStream) stream1.repartition()).keySerde(), consumedInternal.keySerde()); +assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), consumedInternal.valueSerde()); +assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).keySerde(), mySerde); +assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).valueSerde(), mySerde); Review comment: replicating test cases ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ## @@ -1452,6 +1474,24 @@ public void shouldUseRecordMetadataTimestampExtractorWithThrough() { assertNull(processorTopology.source("topic-1").getTimestampExtractor()); } +@Test +public void shouldUseRecordMetadataTimestampExtractorWithRepartition() { Review comment: replicating test ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ## @@ -1467,6 +1507,21 @@ public void shouldSendDataThroughTopicUsingProduced() { assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0; } +@Test +public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() { Review comment: replicating test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211688 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java ## @@ -42,7 +42,7 @@ private static final String TEST_ID = "reset-with-ssl-integration-test"; -private static Map sslConfig; +private static final Map SSL_CONFIG; Review comment: side cleanup ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ## @@ -588,6 +592,14 @@ public void shouldNotAllowNullTopicOnTo() { assertThat(exception.getMessage(), equalTo("topic can't be null")); } +@Test +public void shouldNotAllowNullRepartitionedOnRepartition() { Review comment: replicating test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211685 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -213,7 +213,8 @@ private void runSimpleCopyTest(final int numberOfRestarts, final KStream input = builder.stream(inputTopic); KStream output = input; if (throughTopic != null) { -output = input.through(throughTopic); +input.to(throughTopic); +output = builder.stream(throughTopic); Review comment: Using `to()` and `steam()` is "simpler" as we cleanup topics in-between (and thus avoid internal topics). We could of course also use `repartition()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211624 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -484,8 +493,14 @@ private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic .toStream() .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); -input.through(INTERMEDIATE_USER_TOPIC) -.groupByKey() +final KStream stream; +if (useRepartitioned) { +stream = input.repartition(); +} else { +input.to(INTERMEDIATE_USER_TOPIC); +stream = builder.stream(INTERMEDIATE_USER_TOPIC); Review comment: We still need to test this, because topics using this pattern are still consider _intermediate_ topics and the `--intermediat-topic` flag in `StreamsResetter` is still useful and not changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211526 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionedInternal.java ## @@ -21,33 +21,33 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; -class RepartitionedInternal extends Repartitioned { +public class RepartitionedInternal extends Repartitioned { Review comment: Must be public to be visible in Scala This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211555 ## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ## @@ -285,7 +286,28 @@ public void shouldProcessViaThroughTopic() { assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed); assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed); } - + +@Test +public void shouldProcessViaRepartitionTopic() { Review comment: Replicated the test for `through()` for `repartition()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211457 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -925,9 +920,8 @@ void to(final TopicNameExtractor topicExtractor, * Convert this stream to a {@link KTable}. * * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or - * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via - * {@link #through(String)}) an internal repartitioning topic will be created in Kafka. + * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka. Review comment: Just simplifying this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211408 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1082,7 +1081,7 @@ public void cleanUp() { * This will use the default Kafka Streams partitioner to locate the partition. * If a {@link StreamPartitioner custom partitioner} has been * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or - * {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input + * {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input Review comment: Not sure if this update is necessary. This method is deprecated itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211425 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -846,16 +847,13 @@ * from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}. * The number of partitions is determined based on the upstream topics partition numbers. * - * This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically. Review comment: Not 100% sure if we should remove this now, of when we remove `through()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on a change in pull request #8679: URL: https://github.com/apache/kafka/pull/8679#discussion_r426211270 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3679,58 +3670,6 @@ KTable-KTable Foreign-Key // Write the stream to the output topic, using explicit key and value serdes, // (thus overriding the defaults in the config properties). stream.to(my-stream-output-topic, Produced.with(Serdes.String(), Serdes.Long()); - - -Causes data re-partitioning if any of the following conditions is true: - -If the output topic has a different number of partitions than the stream/table. -If the KStream was marked for re-partitioning. -If you provide a custom StreamPartitioner to explicitly control how to distribute the output records -across the partitions of the output topic. -If the key of an output record is null. - - - -Through Review comment: The diff is weird because the part above repeats below. The actual deletes starts here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org