[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/23148 Cool, opened with what I have so far, will keep an eye out for others for a while. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23182: Config change followup to [SPARK-26177] Automated...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/23182 Config change followup to [SPARK-26177] Automated formatting for Scala code Let's keep this open for a while to see if other configuration tweaks are suggested ## What changes were proposed in this pull request? Formatting configuration changes following up https://github.com/apache/spark/pull/23148 ## How was this patch tested? ./dev/scalafmt You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 scalafmt-config Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23182 commit 07ca58ff2e7b0df19d4d755cba0152e323dc0d99 Author: cody koeninger Date: 2018-11-29T18:03:16Z allow closing parens on same line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/23148 Just pushed a tweak to allow closing parens on same line. New pr for that, or do we want to keep identifying other tweaks first? I think the args on their own line is triggered once the line is longer than maxColumns. There's an option for bin-packing argument lists, but seems like it's only for literal arguments and a few other cases, see https://scalameta.org/scalafmt/docs/configuration.html#binpackliteralargumentlists and https://github.com/scalameta/scalafmt/blob/08c6798a40188ce69b3287e3026becfbd540a847/scalafmt-core/shared/src/main/scala/org/scalafmt/config/BinPack.scala I'm not aware of an option to only format the areas that the diff is in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/23148 I tested that example, it will reformat to ``` checkScan( query, "struct,address:string,pets:int," + "friends:array>," + "relatives:map>>" ) ``` but after that, it will not complain about the long unbreakable string starting with "struct". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23148#discussion_r236858392 --- Diff: dev/.scalafmt.conf --- @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +align = none +align.openParenDefnSite = false +align.openParenCallSite = false +align.tokens = [] +docstrings = JavaDoc +maxColumn = 98 --- End diff -- For now, this won't complain about any code, it just gives a way for contributors to format it. If we did turn on test as part of the build process, it would complain about any code that was different after formatting. That could include code in that 99-101 column range, depending on whether it hit that corner case. E.g. ``` // this is 101 columns and will fail scalastyle, but will pass with scalafmt set to 99 if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}")) { ``` My thinking is that being slightly more strict than existing scalastyle is better than having automated formatting that won't pass other existing automated checks. Trying to find and upstream fixes to off-by-one errors in scalastyle is another option, but that will take a while. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23148#discussion_r236824453 --- Diff: dev/.scalafmt.conf --- @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +align = none +align.openParenDefnSite = false +align.openParenCallSite = false +align.tokens = [] +docstrings = JavaDoc +maxColumn = 98 --- End diff -- I originally had this set to 100, and noticed a corner case where an ending space and curly brace were formatted on columns 100 and 101 respectively, making scalastyle complain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/23148 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23148#discussion_r236423672 --- Diff: .scalafmt.conf --- @@ -0,0 +1,24 @@ +# --- End diff -- Sure, moved --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23148#discussion_r236423438 --- Diff: pom.xml --- @@ -156,6 +156,10 @@ 3.2.2 2.12.7 2.12 +1.5.1 --- End diff -- I moved the scalafmt version inline. The others I think need to stay as parameters so the shell wrapper can override them, to distinguish between the use cases of build pipeline vs contributor fixing code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23148: [SPARK-26177] Automated formatting for Scala code
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/23148 [SPARK-26177] Automated formatting for Scala code ## What changes were proposed in this pull request? Add a maven plugin and wrapper script at ./dev/scalafmt to use scalafmt to format files that differ from git master. Intention is for contributors to be able to use this to automate fixing code style, not to include it in build pipeline yet. If this PR is accepted, I'd make a different PR to update the code style section of https://spark.apache.org/contributing.html to mention the script ## How was this patch tested? Manually tested by modifying a few files and running the script, then checking that scalastyle still passed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 scalafmt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23148.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23148 commit 5df854a340f5e9e2fe7ba60194b4891235420369 Author: cody koeninger Date: 2018-11-21T20:32:57Z WIP on scalafmt config commit 57684272b60ff1213c957541f1db3f9d7aba1543 Author: cody koeninger Date: 2018-11-21T21:22:15Z mvn plugin, e.g. ./build/mvn mvn-scalafmt_2.12:format commit bd7baeca577bf9b519fe028d1e831fb7193e7af9 Author: cody koeninger Date: 2018-11-22T17:07:06Z shell wrapper for mvn scalafmt plugin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23103: [SPARK-26121] [Structured Streaming] Allow users to defi...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/23103 merging to master, thanks @zouzias --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22824 @tdas or @jose-torres any opinion on whether it's worth refactoring these checks as suggested by @arunmahadevan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23103: [SPARK-26121] [Structured Streaming] Allow users to defi...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/23103 @zouzias can you add the new option to docs/structured-streaming-kafka-integration.md as part of this PR? Instructions for building docs are in docs/README.md , ping me if you need a hand. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23103#discussion_r235480695 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends Logging { .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) .build() + /** + * Returns a unique consumer group (group.id), allowing the user to set the prefix of + * the consumer group + */ + private def streamingUniqueGroupId(parameters: Map[String, String], --- End diff -- first arg should be on its own newline as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23103#discussion_r235479872 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends Logging { .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) .build() + /** + * Returns a unique consumer group (group.id), allowing the user to set the prefix of + * the consumer group + */ + private def streamingUniqueGroupId(parameters: Map[String, String], + metadataPath: String): String = { +val groupIdPrefix = parameters + .getOrElse("group.id.prefix", "spark-kafka-source") --- End diff -- kafka.* is reserved for the existing kafka project's client configs, see e.g. line 86. I'd just go with groupIdPrefix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23103#discussion_r235462394 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends Logging { .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) .build() + /** + * Returns a unique consumer group (group.id), allowing the user to set the prefix of + * the consumer group + */ + private def streamingUniqueGroupId(parameters: Map[String, String], + metadataPath: String): String = { +val groupIdPrefix = parameters + .getOrElse("group.id.prefix", "spark-kafka-source") --- End diff -- It seems like convention has been to mostly use camelcase for streaming options that aren't from the existing kafka.blah.whatever configuration namespace... e.g. subscribePattern, startingOffsets, maxOffsetsPerTrigger --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/23103#discussion_r235461374 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends Logging { .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) .build() + /** + * Returns a unique consumer group (group.id), allowing the user to set the prefix of + * the consumer group + */ + private def streamingUniqueGroupId(parameters: Map[String, String], --- End diff -- Sorry there isn't an automatic formatter for this... but use 4 space indentation for multi-line argument lists https://github.com/databricks/scala-style-guide#spacing-and-indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21038 @SehanRathnayake Kafka is designed for at most one consumer per partition per consumer group at any given point in time, https://kafka.apache.org/documentation/#design_consumerposition Spark already manages creating a consumer per partition for the consumer group associated with a stream. If you have a valid use case for running multiple Spark applications with the same consumer group, please explain it in a jira, not discussion of a pull request that has already been merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22824 @arunmahadevan I think this is clear, even if it is redundant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22824 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22703: [SPARK-25705][BUILD][STREAMING] Remove Kafka 0.8 integra...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22703 I guess the only argument to the contrary would be if some of the known issues end up being better solved with minor API changes, leaving it marked as experimental would technically be better notice. I personally think it's clearer to remove the experimental. On Fri, Oct 12, 2018, 6:18 PM Sean Owen wrote: > *@srowen* commented on this pull request. > -- > > In docs/streaming-kafka-0-10-integration.md > <https://github.com/apache/spark/pull/22703#discussion_r224936431>: > > > @@ -3,7 +3,11 @@ layout: global > title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) > --- > > -The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [Direct Stream approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers). It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the [new Kafka consumer API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, there are notable differences in usage. This version of the integration is marked as experimental, so the API is potentially subject to change. > +The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka > +partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses > +the [new Kafka consumer API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, > +there are notable differences in usage. This version of the integration is marked as experimental, so the API is > > Yeah, good general point. Is the kafka 0.10 integration at all > experimental anymore? Is anything that survives from 2.x to 3.x? I'd say > "no" in almost all cases. What are your personal views on that? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22703#discussion_r224936431>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB1mUBOw72gARWj6GcclgXDimi6KIks5ukSNggaJpZM4XYdgE> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22703: [SPARK-25705][BUILD][STREAMING] Remove Kafka 0.8 ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/22703#discussion_r224899199 --- Diff: docs/streaming-kafka-0-10-integration.md --- @@ -3,7 +3,11 @@ layout: global title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) --- -The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [Direct Stream approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers). It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the [new Kafka consumer API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, there are notable differences in usage. This version of the integration is marked as experimental, so the API is potentially subject to change. +The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka +partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses +the [new Kafka consumer API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, +there are notable differences in usage. This version of the integration is marked as experimental, so the API is --- End diff -- Do we want to leave the new integration marked as experimental if it is now the only available one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/3 Thanks, merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22223: [SPARK-25233][Streaming] Give the user the option of spe...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/3 I think since the default behavior is still 1, it's probably ok to let someone do what they want here On Wed, Aug 29, 2018 at 3:51 PM, rezasafi wrote: > *@rezasafi* commented on this pull request. > -- > > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ > DirectKafkaInputDStream.scala > <https://github.com/apache/spark/pull/3#discussion_r213829668>: > > > @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( > if (effectiveRateLimitPerPartition.values.sum > 0) { >val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 >Some(effectiveRateLimitPerPartition.map { > -case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L) > +case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, > + Math.max(ppc.minRatePerPartition(tp), 1L)) > > I just didn't want to break the reasoning behind SPARK-18371 to have at > least 1 always. I didn't have any other reason for this. I can change it to > give the user the freedom. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/3#discussion_r213829668>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB2QkKLkQOCQoRTA0hArIsJIhY99Oks5uVv7LgaJpZM4WLq9l> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3#discussion_r213825892 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some(effectiveRateLimitPerPartition.map { -case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L) +case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, + Math.max(ppc.minRatePerPartition(tp), 1L)) --- End diff -- Is the second Math.max actually necessary? The default implementation of minRatePerPartition will be 1 anyway. If someone makes a custom implementation that e.g. returns zero, should they get what they asked for?. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3#discussion_r212691622 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -141,10 +143,9 @@ private[spark] class DirectKafkaInputDStream[K, V]( tp -> Math.max(offset - currentOffsets(tp), 0) } val totalLag = lagPerPartition.values.sum - lagPerPartition.map { case (tp, lag) => val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) - val backpressureRate = lag / totalLag.toDouble * rate + var backpressureRate = lag / totalLag.toDouble * rate --- End diff -- Why was this changed to a var? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22223: SPARK-25233. Give the user the option of specifyi...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3#discussion_r212691455 --- Diff: docs/configuration.md --- @@ -1925,6 +1925,14 @@ showDF(properties, numRows = 200, truncate = FALSE) first batch when the backpressure mechanism is enabled. + + spark.streaming.backpressure.fixedMinMessagePerPartition --- End diff -- This only applies to Kafka. Why not namespace it under spark.streaming.kafka? What does the word "fixed" add to the explanation? There's already a maxRatePerPartition, why not minRatePerPartition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22223: SPARK-25233. Give the user the option of specifying a fi...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/3 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22138 Seeking means the pre-fetched data is wasted, so it's not a light operation. It shouldn't be unavoidable, e.g. if consumers were cached keyed by topicpartition, groupid, next offset to be processed. One concern there would be how to make sure you don't have lots of idle consumers. The question of how serious an issue is could be solved by measurement, but I don't have production structured streaming jobs, much less ones that exhibit the kind of behavior tdas was talking about in the original ticket. On Mon, Aug 20, 2018 at 7:36 PM, Jungtaek Lim wrote: > @koeninger <https://github.com/koeninger> > > I'm not sure but are you saying that an executor cares about multiple > queries (multiple jobs) concurrently? I honestly didn't notice it. If that > is going to be problem, we should add something (could we get query id at > that time?) in cache key to differentiate consumers. If we want to avoid > extra seeking due to different offsets, consumers should not be reused > among with multiple queries, and that's just a matter of cache key. > > If you are thinking about co-use of consumers among multiple queries > because of reusing connection to Kafka, I think extra seeking is > unavoidable (I guess fetched data should be much more critical issue unless > we never reuse after returning to pool). If seeking is light operation, we > may even go with only reusing connection (not position we already sought): > always resetting position (and data maybe?) when borrowing from pool or > returning consumer to pool. > > Btw, the rationalization of this patch is not solving the issue you're > referring. This patch is also based on #20767 > <https://github.com/apache/spark/pull/20767> but dealing with another > improvements pointed out in comments: adopt pool library to not reinvent > the wheel, and also enabling metrics regarding the pool. > > I'm not sure the issue you're referring is a serious one (show-stopper): > if the issue is a kind of serious, someone should handle the issue once we > are aware of the issue at March, or at least relevant JIRA issue should be > filed with detailed explanation before. I'd like to ask you in favor of > handling (or filing) the issue since you may know the issue best. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22138#issuecomment-414498067>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB7qFjFrj9dWWkIcUcKcAKbEicuOwks5uS0gDgaJpZM4WCUJs> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22138 see e.g. https://github.com/apache/spark/pull/20767 for background Even if this patch doesn't change behavior, if it doesn't really solve the problem, it may make it harder to solve it correctly. On Mon, Aug 20, 2018 at 10:32 AM, Jungtaek Lim wrote: > If my understanding is right, looks like current approach has same > limitation. I guess you're busy, but could you refer some issue number or > point out some code lines which was based on the reason if you remember > any? It should help to determine whether this patch breaks more spots or > not. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22138#issuecomment-414336510>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB52zi78tff9E-kvPzeB7k0ZnOyFBks5uSsh0gaJpZM4WCUJs> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22138 I thought the whole reason the caching was changed from the initial naive approach to the current approach in master was that people were running jobs that were scheduling multiple consumers for the same topicpartition and group. On Sun, Aug 19, 2018 at 7:51 PM, Jungtaek Lim wrote: > @koeninger <https://github.com/koeninger> > I'm not sure I got your point correctly. This patch is based on some > assumptions, so please correct me if I'm missing here. Assumptions follow: > >1. > >There's actually no multiple consumers for a given key working at the >same time. The cache key contains topic partition as well as group id. Even >the query tries to do self-join so reading same topic in two different >sources, I think group id should be different. >2. > >In normal case the offset will be continuous, and that's why cache >should help. In retrying case this patch invalidates cache as same as >current behavior, so it should start from scratch. > > (Btw, I'm curious what's more expensive between leveraging pooled object > but resetting kafka consumer vs invalidating pooled objects and start > from scratch. Latter feels more safer but if we just need extra seek > instead of reconnecting to kafka, resetting could be improved and former > will be cheaper. I feel it is out of scope of my PR though.) > > This patch keeps most of current behaviors, except two spots I guess. I > already commented a spot why I change the behavior, and I'll comment > another spot for the same. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22138#issuecomment-414164788>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB8x3Khz4bWIxphLJHWFvcc8H4ERyks5uSfnvgaJpZM4WCUJs> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22143 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaData...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/22138 If you have multiple consumers for a given key, and those consumers are at different offsets, isn't it likely that the client code will not get the right consumer, leading to extra seeking? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 Recursively creating a Kafka RDD during creation of a Kafka RDD would need a base case, but yeah, some way to have appropriate preferred locations. On Mon, Aug 6, 2018 at 2:58 AM, Quentin Ambard wrote: > *@QuentinAmbard* commented on this pull request. > -- > > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ > DirectKafkaInputDStream.scala > <https://github.com/apache/spark/pull/21917#discussion_r207802444>: > > > - val fo = currentOffsets(tp) > - OffsetRange(tp.topic, tp.partition, fo, uo) > + /** > + * Return the offset range. For non consecutive offset the last offset must have record. > + * If offsets have missing data (transaction marker or abort), increases the > + * range until we get the requested number of record or no more records. > + * Because we have to iterate over all the records in this case, > + * we also return the total number of records. > + * @param offsets the target range we would like if offset were continue > + * @return (totalNumberOfRecords, updated offset) > + */ > + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { > +if (nonConsecutive) { > + val localRw = rewinder() > + val localOffsets = currentOffsets > + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { > > Are you suggesting I should create a new kafkaRDD instead, and consume > from this RDD to get the last offset range? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21917#discussion_r207802444>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB_EelzeJDa36_SAKaH8trQC5bTnGks5uN_cugaJpZM4VmlWm> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 Example report of skipped offsets in a non-compacted non-transactional situation http://mail-archives.apache.org/mod_mbox/kafka-users/201801.mbox/%3ccakwx9vxc1cdosqwwwjk3qmyy3svvtmh+rjdrjyvsbejsds8...@mail.gmail.com%3EFo I asked on the kafka list about ways to tell if an offset is a transactional marker. I also asked about endOffset alternatives, although I think that doesn't totally solve the problem (for instance, in cases where the batch size has been rate limited) On Mon, Aug 6, 2018 at 2:57 AM, Quentin Ambard wrote: > By failed, you mean returned an empty collection after timing out, even > though records should be available? You don't. You also don't know that it > isn't just lost because kafka skipped a message. AFAIK from the information > you have from a kafka consumer, once you start allowing gaps in offsets, > you don't know. > > Ok that's interesting, my understanding was that if you successfully poll > and get results you are 100% sure that you don't lose anything. Do you have > more details on that? Why would kafka skip a record while consuming? > > Have you tested comparing the results of consumer.endOffsets for consumers > with different isolation levels? > > endOffsets returns the last offset (same as seekToEnd). But you're right > that the easiest solution for us would be to have something like > seekToLastRecord method instead. Maybe something we could also ask ? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21917#issuecomment-410620996>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB2FVhHp_76l0WnRg_2WPgzSx1LlSks5uN_bxgaJpZM4VmlWm> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721681 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerRecords} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging + +class OffsetWithRecordScannerSuite + extends SparkFunSuite +with Logging { + + class OffsetWithRecordScannerMock[K, V](records: List[Option[ConsumerRecord[K, V]]]) +extends OffsetWithRecordScanner[K, V]( + Map[String, Object]("isolation.level" -> "read_committed").asJava, 1, 1, 0.75F, true) { +var i = -1 +override protected def getNext(c: KafkaDataConsumer[K, V]): Option[ConsumerRecord[K, V]] = { + i = i + 1 + records(i) +} + + } + + val emptyConsumerRecords = new ConsumerRecords[String, String](ju.Collections.emptyMap()) + val tp = new TopicPartition("topic", 0) + + test("Rewinder construction should fail if isolation level isn set to read_committed") { +intercept[IllegalStateException] { + new OffsetWithRecordScanner[String, String]( +Map[String, Object]("isolation.level" -> "read_uncommitted").asJava, 1, 1, 0.75F, true) +} + } + + test("Rewinder construction shouldn't fail if isolation level isn't set") { + assert(new OffsetWithRecordScanner[String, String]( +Map[String, Object]().asJava, 1, 1, 0.75F, true) != null) + } + + test("Rewinder construction should fail if isolation level isn't set to committed") { +intercept[IllegalStateException] { + new OffsetWithRecordScanner[String, String]( +Map[String, Object]("isolation.level" -> "read_uncommitted").asJava, 1, 1, 0.75F, true) +} + } + + test("Rewind should return the proper count.") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(2), Some(3))) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) +assert(offset === 2) +assert(size === 2) + } + + test("Rewind should return the proper count with gap") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(3), Some(4), Some(5))) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3) +assert(offset === 4) +assert(size === 3) + } + + test("Rewind should return the proper count for the end of the iterator") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(2), None)) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3) +assert(offset === 3) +assert(size === 3) + } + + test("Rewind should return the proper count missing data") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), None)) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) +assert(offset === 1) +assert(size === 1) + } + + test("Rewind should return the proper count without data") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(None)) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) +assert(offset === 0) +assert(size === 0) +
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721657 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala --- @@ -90,21 +90,23 @@ final class OffsetRange private( val topic: String, val partition: Int, val fromOffset: Long, -val untilOffset: Long) extends Serializable { +val untilOffset: Long, +val recordNumber: Long) extends Serializable { --- End diff -- Does mima actually complain about binary compatibility if you just make recordNumber count? It's just an accessor either way... If so, and you have to do this, I'd name this recordCount consistently throughout. Number could refer to a lot of things that aren't counts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721492 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -191,6 +211,11 @@ private[kafka010] class InternalKafkaConsumer[K, V]( buffer.previous() } + def seekAndPoll(offset: Long, timeout: Long): ConsumerRecords[K, V] = { --- End diff -- Is this used anywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721482 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -183,6 +187,22 @@ private[kafka010] class InternalKafkaConsumer[K, V]( record } + /** + * Similar to compactedStart but will return None if poll doesn't --- End diff -- Did you mean compactedNext? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721435 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { --- End diff -- Because this isn't a kafka rdd, it isn't going to take advantage of preferred locations, which means it's going to create cached consumers on different executors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 > How do you know that offset 4 isn't just lost because poll failed? By failed, you mean returned an empty collection after timing out, even though records should be available? You don't. You also don't know that it isn't just lost because kafka skipped a message. AFAIK from the information you have from a kafka consumer, once you start allowing gaps in offsets, you don't know. I understand your point, but even under your proposal you have no guarantee that the poll won't work in your first pass during RDD construction, and then fail on the executor during computation, right? > The issue with your proposal is that SeekToEnd gives you the last offset which might not be the last record. Have you tested comparing the results of consumer.endOffsets for consumers with different isolation levels? Your proposal might end up being the best approach anyway, just because of the unfortunate effect of StreamInputInfo and count, but I want to make sure we think this through. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 If the last offset in the range as calculated by the driver is 5, and on the executor all you can poll up to after a repeated attempt is 3, and the user already told you to allowNonConsecutiveOffsets... then you're done, no error. Why does it matter if you do this logic when you're reading all the messages in advance and counting, or when you're actually computing? To put it another way, this PR is a lot of code change and refactoring, why not just change the logic of e.g. how CompactedKafkaRDDIterator interacts with compactedNext? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 Still playing devil's advocate here, I don't think stopping at 3 in your example actually tells you anything about the cause of the gaps in the sequence at 4. I'm not sure you can know that the gap is because of a transaction marker, without a modified kafka consumer library. If the actual problem is that when allowNonConsecutiveOffsets is set we need to allow gaps even at the end of an offset range... why not just fix that directly? Master is updated to kafka 2.0 at this point, so we should be able to write a test for your original jira example of a partition consisting of 1 message followed by 1 transaction commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21955 I don't see an obvious issue. Looks like zookeeper.connection.timeout.ms isn't being set, so it's defaulting to 6 seconds... could try tweaking it upwards. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207664852 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) --- End diff -- Isn't the test hanging on the line right before that change though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21983: [SPARK-24987][SS] - Fix Kafka consumer leak when no new ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21983 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207437645 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { +tpos.map { case (tp, o) => + val offsetAndCount = localRw.getLastOffsetAndCount(localOffsets(tp), tp, o) + (tp, offsetAndCount) +} + }).collect() --- End diff -- What exactly is the benefit gained by doing a duplicate read of all the messages? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21955 jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207422889 --- Diff: external/kafka-0-10/pom.xml --- @@ -28,7 +28,7 @@ spark-streaming-kafka-0-10_2.11 streaming-kafka-0-10 -0.10.0.1 +2.0.0 jar Spark Integration for Kafka 0.10 --- End diff -- Probably worth updating the name to indicate it's for brokers version 0.10 + --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21690 LGTM, merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21690 What results are you seeing? On Thu, Jul 12, 2018, 6:53 AM Yuanbo Liu wrote: > @koeninger <https://github.com/koeninger> Sorry to interrupt, could you > take a look at my patch? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21690#issuecomment-404501493>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGABxNyVlFhIo7z2D7jXtfeBfsVRsMmks5uF0bXgaJpZM4U-p4M> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21690 @yuanboliu What I'm suggesting is more like this: https://github.com/apache/spark/compare/master...koeninger:SPARK-24713?expand=1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21690 @yuanboliu From reading KafkaConsumer code, and from testing, I don't see where consumer.position() alone would un-pause topicpartitions. See below. Can you give a counter-example? I am seeing poll() reset the paused state. When you are having the problem, are you seeing the info level log messages "poll(0) returned messages"? If that's what's happening, I think the best we can do is call pause() in only one place, the first line of paranoidPoll, e.g. `c.pause(c.assignment) val msgs = c.poll(0) ` Here's what I saw in testing: `scala> c.paused res34: java.util.Set[org.apache.kafka.common.TopicPartition] = [] scala> c.assignment res35: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0] scala> c.pause(topics) scala> c.paused res37: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0] scala> c.position(tp) res38: Long = 248 scala> c.paused res39: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0] scala> c.poll(0) res40: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe scala> c.paused res41: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0] scala> c.position(tp) res42: Long = 248 scala> c.paused res43: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0] scala> c.poll(1) res44: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe scala> c.paused res45: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0] scala> c.poll(100) res46: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = org.apache.kafka.clients.consumer.ConsumerRecords@28e4439b scala> c.paused res47: java.util.Set[org.apache.kafka.common.TopicPartition] = [] ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21690 @yuanboliu Can you clarify why repeated pause is necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21690 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21651: [SPARK-18258] Sink need access to offset representation
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21651 - We need agreement on whether it is worth making a change to the public Sink api (probably not any time soon, judging from the spark 3.0 vs 2.4 discussion), or whether there is a different way to accomplish the goal. - I wouldn't worry about what any particular sink implementation does with the offsets, most of the existing ones shouldn't do anything by default. You just need a proof of concept that a given sink (e.g. a database sink) can do something useful with them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16006 #19431 was merged, thanks for your work. This PR should probably be closed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20997 I'm fine as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21300 @gatorsmile this is identical to the original PR which was reviewed by @srowen and discussion on the jira to backport it had not raised any objections since April --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] All...
Github user koeninger closed the pull request at: https://github.com/apache/spark/pull/21300 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21300 Merging to branch-2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21300: [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] All...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/21300 [SPARK-24067][BACKPORT-2.3][STREAMING][KAFKA] Allow non-consecutive offsets ## What changes were proposed in this pull request? Backport of the bugfix in SPARK-17147 Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log). ## How was this patch tested? Added new unit test justinrmiller has been testing this branch in production for a few weeks You can merge this pull request into a Git repository by running: $ git pull https://github.com/daten-kieker/spark branch-2.3_kafkafix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21300 commit 84648e0222330fd3bc43ce214689b66795efdffe Author: cody koeninger <cody@...> Date: 2018-02-27T14:21:11Z [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets ## What changes were proposed in this pull request? Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log). ## How was this patch tested? Added new unit test justinrmiller has been testing this branch in production for a few weeks Author: cody koeninger <c...@koeninger.org> Closes #20572 from koeninger/SPARK-17147. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19183: [SPARK-21960][Streaming] Spark Streaming Dynamic Allocat...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19183 @sansagara sounds reasonable to me --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19183: [SPARK-21960][Streaming] Spark Streaming Dynamic Allocat...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19183 I don't have personal experience with streaming dynamic allocation, but this patch makes sense to me and I don't see anything obviously wrong. I agree with Holden regarding tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r184210716 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] class InternalKafkaConsumer[K, V]( +val topicPartition: TopicPartition, +val kafkaParams: ju.Map[String, Object]) extends Logging { + + private[kafka010] val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) +.asInstanceOf[String] + + private val consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = { +val c = new KafkaConsumer[K, V](kafkaParams) +val topics = ju.Arrays.asList(topicPartition) +c.assign(topics)
[GitHub] spark issue #19887: [SPARK-21168] KafkaRDD should always set kafka clientId.
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19887 Merging to master, thanks @liu-zhaokun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19887: [SPARK-21168] KafkaRDD should always set kafka clientId.
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19887 Seems ok to me, long as it passes retest --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19887: [SPARK-21168] KafkaRDD should always set kafka clientId.
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19887 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21038: [SPARK-22968][DStream] Throw an exception on partition r...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21038 Seems like that should help address the confusion. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20997 I think if we can't come up with a pool design now that solves most of the issues, we should switch back to the one cached consumer approach that the SQL code is using. On Mon, Apr 16, 2018 at 3:25 AM, Gabor Somogyi <notificati...@github.com> wrote: > *@gaborgsomogyi* commented on this pull request. > -- > > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ > KafkaDataConsumer.scala > <https://github.com/apache/spark/pull/20997#discussion_r181655790>: > > > + * If matching consumer doesn't already exist, will be created using kafkaParams. > + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. > + * > + * Note: This method guarantees that the consumer returned is not currently in use by anyone > + * else. Within this guarantee, this method will make a best effort attempt to re-use consumers by > + * caching them and tracking when they are in use. > + */ > + def acquire[K, V]( > + groupId: String, > + topicPartition: TopicPartition, > + kafkaParams: ju.Map[String, Object], > + context: TaskContext, > + useCache: Boolean): KafkaDataConsumer[K, V] = synchronized { > +val key = new CacheKey(groupId, topicPartition) > +val existingInternalConsumers = Option(cache.get(key)) > + .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]]) > > That's correct, the SQL part isn't keeping a linked list pool but a single > cached consumer. I was considering your suggestion and came to the same > conclusion: > > Can you clarify why you want to allow only 1 cached consumer per topicpartition, closing any others at task end? > > It seems like opening and closing consumers would be less efficient than allowing a pool of more than one consumer per topicpartition. > > Though limiting the number of cached consumers per groupId/TopicPartition > is a must as you've pointed out. On the other side if we go the SQL way > it's definitely less risky. Do you think we should switch back to the one > cached consumer approach? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/20997#discussion_r181655790>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGABzOM08a0IoWTJWOi204fvKoyXc6xks5tpFWDgaJpZM4TKDOs> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181507520 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181506863 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181506582 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21038 I can't think of a valid reason to create a configuration to allow it. It just fundamentally doesn't make sense to run different apps with the same group id. Trying to catch and rethrow the exception with more information might make sense. On Wed, Apr 11, 2018, 20:05 Saisai Shao <notificati...@github.com> wrote: > Thanks @koeninger <https://github.com/koeninger> for your comments. I > think your suggestion is valid, the log here is just pasted from JIRA, but > we also got the same issue from customer's report. > > Here in the PR description, I mentioned that using two apps with same > group id to mimic this issue. But I'm not sure the real use case from our > customer, maybe in their scenario such usage is valid. > > So I'm wondering if we can add a configuration to control whether it > should be fail or just warning. Also I think exception/warning log should > be improved to directly tell user about consumer rebalance issue, rather > than throwing from Kafka as "no current assignment for partition xxx". > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21038#issuecomment-380641782>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB-6ISk53Qsrh0Hwopdc8uk-F4ZFrks5tnqhEgaJpZM4TPftQ> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21038: [SPARK-22968][DStream] Fix Kafka partition revoked issue
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21038 The log in the jira looks like it's from a consumer rebalance, i.e. more than one driver consumer was running with the same group id. Isn't the underlying problem here that the user is creating multiple streams with the same group id, despite what the documentation says? The log even says s/he copy-pasted the documentation group id "group use_a_separate_group_id_for_each_stream" I don't think we should silently "fix" that. As a user, I wouldn't expect app A to suddenly start processing only half of the partitions just because entirely different app B started with the (misconfigured) same group id. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180282891 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), --- End diff -- This is mostly vestigial (there used to be a remove method that took a groupId, but no kafkaParams, so there was symmetry). I don't see a reason it can't be changed to match the SQL version at this point, i.e. assign groupId from the params. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180283864 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180284812 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180280531 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer --- End diff -- This is an example of the cut & paste I was referring to. In this case, I don't believe consumer is ever reassigned, so it doesn't even need to be a var. It was reassigned in the SQL version of the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180285599 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180280210 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.kafka.clients.consumer.ConsumerConfig._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark._ + +class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var testUtils: KafkaTestUtils = _ + + override def beforeAll { +super.beforeAll() +testUtils = new KafkaTestUtils +testUtils.setup() + } + + override def afterAll { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } + + test("concurrent use of KafkaDataConsumer") { +KafkaDataConsumer.init(16, 64, 0.75f) + +val topic = "topic" + Random.nextInt() +val data = (1 to 1000).map(_.toString) +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic) +testUtils.sendMessages(topic, data.toArray) + +val groupId = "groupId" +val kafkaParams = Map[String, Object]( + GROUP_ID_CONFIG -> groupId, + BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, + KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, + VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, + AUTO_OFFSET_RESET_CONFIG -> "earliest", + ENABLE_AUTO_COMMIT_CONFIG -> "false" +) + +val numThreads = 100 +val numConsumerUsages = 500 + +@volatile var error: Throwable = null + +def consume(i: Int): Unit = { + val useCache = Random.nextBoolean + val taskContext = if (Random.nextBoolean) { +new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null) + } else { +null + } + val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( +groupId, topicPartition, kafkaParams.asJava, taskContext, useCache) + try { +val rcvd = 0 until data.length map { offset => + val bytes = consumer.get(offset, 1).value() + new String(bytes) +} +assert(rcvd == data) + } catch { +case e: Throwable => + error = e + throw e + } finally { +consumer.release() + } +} + +val threadPool = Executors.newFixedThreadPool(numThreads) +try { + val futures = (1 to numConsumerUsages).map { i => +threadPool.submit(new Runnable { + override def run(): Unit = { consume(i) } +}) + } + futures.foreach(_.get(1, TimeUnit.MINUTES)) + assert(error == null) +} finally { + threadPool.shutdown() +} + } +} --- End diff -- If this PR is intended to fix a problem with silent reading of incorrect data, can you add a test reproducing that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r180283733 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = {
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20997 In general, 2 things about this make me uncomfortable: - It's basically a cut-and-paste of the SQL equivalent PR, https://github.com/apache/spark/pull/20767, but it is different from both that PR and the existing DStream code. - I don't see an upper bound on the number of consumers per key, nor a way of reaping idle consumers. If the SQL equivalent code is likely to be modified to use pooling of some kind, seems better to make a consistent decision. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backp...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19431 Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19431 @akonopko Thanks! Sorry, but I just noticed the title of the PR - can you adjust it to match convention, e.g. [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backpressure.initialRate to direct Kafka streams and then I'll get it merged ;) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19431 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17774 merged to master Thanks @arzt ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19431 @akonopko thanks for this, if you can resolve merge conflict I think we can get this in --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19431 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17774: [SPARK-18371][Streaming] Spark Streaming backpressure ge...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/17774 LGTM @tdas @zsxwing absent any objections from you in the next couple of days, I'll merge this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19431 @tdas any concerns? If @omuravskiy doesn't express any objections (since these tests are basically taken directly from his linked PR) in the next couple of days, I'm inclined to merge this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r173641331 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -456,6 +455,60 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { --- End diff -- Aren't the descriptions of these tests backwards, i.e. this the one testing that maxRatePerPartition is honored? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20767 Can you clarify why you want to allow only 1 cached consumer per topicpartition, closing any others at task end? It seems like opening and closing consumers would be less efficient than allowing a pool of more than one consumer per topicpartition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16006 @omuravskiy can you comment on https://github.com/apache/spark/pull/19431 since it appears to be based on your PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16006: [SPARK-18580] [DStreams] [external/kafka-0-10] Use spark...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/16006 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170799163 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) +dir.mkdirs() +val logProps = new ju.Properties() +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) +logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) --- End diff -- Yeah, it's necessary, otherwise it gets treated as AnyRef. Changed to Float.valueOf FWIW --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169850605 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), --- End diff -- Agreed that require is better, will fix it in a sec Pretty sure assert is just a function in Predef.scala that throws an AssertionError, it's not like a java assert statement that can be en / disabled with java -ea / -da. Tested it out: https://gist.github.com/koeninger/6155cd94a19d1a6373ba0b40039e97e3 Disabling scala asserts can be done at compile time with -Xdisable-assertions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169538019 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private( // TODO if the buffer was kept around as a random-access structure, // could possibly optimize re-calculating of an RDD in the same batch - protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() --- End diff -- Agreed, think it should be ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169537541 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), --- End diff -- That's a "shouldn't happen unless the topicpartition or broker is gone" kind of thing. Semantically I could see that being more like require than assert, but don't have a strong opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org