[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105575677 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -606,6 +607,24 @@ class KafkaSourceSuite extends KafkaSourceTest { assert(query.exception.isEmpty) } + test("test to get offsets from case insensitive parameters") { --- End diff -- Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17209 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105575111 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -606,6 +607,24 @@ class KafkaSourceSuite extends KafkaSourceTest { assert(query.exception.isEmpty) } + test("test to get offsets from case insensitive parameters") { --- End diff -- nit: remove `test to`. I will fix it when merging your PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105553128 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -128,18 +123,18 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister .map { k => k.drop(6).toString -> parameters(k) } .toMap -val startingRelationOffsets = - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { -case Some("earliest") => EarliestOffsetRangeLimit -case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) -case None => EarliestOffsetRangeLimit +val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { +case earliest @ EarliestOffsetRangeLimit => earliest --- End diff -- ð much more simple --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548820 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -606,6 +607,36 @@ class KafkaSourceSuite extends KafkaSourceTest { assert(query.exception.isEmpty) } + for((optionKey, optionValue, answer) <- Seq( +(STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit), +(ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit), +(STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""", + SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23) { +test(s"test offsets containing uppercase characters (${answer.getClass.getSimpleName})") { + val offset = getKafkaOffsetRangeLimit( +Map(optionKey -> optionValue), +optionKey, +answer + ) + + assert(offset == answer) --- End diff -- nit `==` => `===` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548818 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -606,6 +607,36 @@ class KafkaSourceSuite extends KafkaSourceTest { assert(query.exception.isEmpty) } + for((optionKey, optionValue, answer) <- Seq( --- End diff -- nit: move the `for` loop into the `test`. Not need to create many tests here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548819 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -606,6 +607,36 @@ class KafkaSourceSuite extends KafkaSourceTest { assert(query.exception.isEmpty) } + for((optionKey, optionValue, answer) <- Seq( +(STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit), +(ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit), +(STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""", + SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23) { +test(s"test offsets containing uppercase characters (${answer.getClass.getSimpleName})") { + val offset = getKafkaOffsetRangeLimit( +Map(optionKey -> optionValue), +optionKey, +answer + ) + + assert(offset == answer) +} + } + + for((optionKey, answer) <- Seq( --- End diff -- Same as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548749 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -128,18 +123,18 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister .map { k => k.drop(6).toString -> parameters(k) } .toMap -val startingRelationOffsets = - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { -case Some("earliest") => EarliestOffsetRangeLimit -case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) -case None => EarliestOffsetRangeLimit +val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { +case earliest @ EarliestOffsetRangeLimit => earliest --- End diff -- `startingRelationOffsets` won't be `latest` since it's checked in `validateBatchOptions`. Why not just: ```Scala val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) assert(startingRelationOffsets != LatestOffsetRangeLimit) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548762 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -388,34 +383,34 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private def validateBatchOptions(caseInsensitiveParams: Map[String, String]) = { // Batch specific options - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { - case Some("earliest") => // good to go - case Some("latest") => +KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { + case EarliestOffsetRangeLimit => // good to go + case LatestOffsetRangeLimit => throw new IllegalArgumentException("starting offset can't be latest " + "for batch queries on Kafka") - case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))) -.partitionOffsets.foreach { + case specific: SpecificOffsetRangeLimit => +specific.partitionOffsets.foreach { case (tp, off) if off == KafkaOffsetRangeLimit.LATEST => throw new IllegalArgumentException(s"startingOffsets for $tp can't " + "be latest for batch queries on Kafka") case _ => // ignore } - case _ => // default to earliest } - caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { - case Some("earliest") => +KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match { + case EarliestOffsetRangeLimit => throw new IllegalArgumentException("ending offset can't be earliest " + "for batch queries on Kafka") - case Some("latest") => // good to go - case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))) -.partitionOffsets.foreach { + case LatestOffsetRangeLimit => // good to go + case specific: SpecificOffsetRangeLimit => --- End diff -- nit: `case SpecificOffsetRangeLimit(partitionOffsets) =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548753 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -128,18 +123,18 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister .map { k => k.drop(6).toString -> parameters(k) } .toMap -val startingRelationOffsets = - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { -case Some("earliest") => EarliestOffsetRangeLimit -case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) -case None => EarliestOffsetRangeLimit +val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { +case earliest @ EarliestOffsetRangeLimit => earliest +case specific @ SpecificOffsetRangeLimit(_) => specific +case _ => EarliestOffsetRangeLimit } -val endingRelationOffsets = - caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { -case Some("latest") => LatestOffsetRangeLimit -case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) -case None => LatestOffsetRangeLimit +val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, + ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match { +case latest @ LatestOffsetRangeLimit => latest --- End diff -- Same as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105548760 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -388,34 +383,34 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private def validateBatchOptions(caseInsensitiveParams: Map[String, String]) = { // Batch specific options - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { - case Some("earliest") => // good to go - case Some("latest") => +KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { + case EarliestOffsetRangeLimit => // good to go + case LatestOffsetRangeLimit => throw new IllegalArgumentException("starting offset can't be latest " + "for batch queries on Kafka") - case Some(json) => (SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))) -.partitionOffsets.foreach { + case specific: SpecificOffsetRangeLimit => --- End diff -- nit: `case SpecificOffsetRangeLimit(partitionOffsets) =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105528025 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -450,10 +445,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private[kafka010] object KafkaSourceProvider { private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign") - private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" - private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" + private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" + private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" --- End diff -- change for unit test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105315774 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -83,9 +83,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister .toMap val startingStreamOffsets = - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { -case Some("latest") => LatestOffsetRangeLimit -case Some("earliest") => EarliestOffsetRangeLimit + caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim) match { +case a @ Some(offset) if offset.toLowerCase.equals("latest") => LatestOffsetRangeLimit --- End diff -- Could you just write a utility method to get offsets from caseInsensitiveParams? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17209#discussion_r105313971 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -83,9 +83,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister .toMap val startingStreamOffsets = - caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { -case Some("latest") => LatestOffsetRangeLimit -case Some("earliest") => EarliestOffsetRangeLimit + caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim) match { +case a @ Some(offset) if offset.toLowerCase.equals("latest") => LatestOffsetRangeLimit --- End diff -- `case a @ Some(offset) if offset.toLowerCase.equals("latest")` -> `case Some(offset) if offset.toLowerCase == "latest"`. Please also fix other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17209: [SPARK-19853][SS] uppercase kafka topics fail whe...
GitHub user uncleGen opened a pull request: https://github.com/apache/spark/pull/17209 [SPARK-19853][SS] uppercase kafka topics fail when startingOffsets are SpecificOffsets ## What changes were proposed in this pull request? When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer. KafkaSourceProvider.scala: ``` val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { case Some("latest") => LatestOffsets case Some("earliest") => EarliestOffsets case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) case None => LatestOffsets } ``` ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/uncleGen/spark SPARK-19853 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17209 commit e2a26bf8fb8554fb030e7f5bd2197befb9ed55e2 Author: uncleGen Date: 2017-03-08T11:59:17Z Uppercase Kafka topics fail when startingOffsets are SpecificOffsets --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org