[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20698 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171989658 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +} + +private[kafka010] object KafkaOffsetRangeCalculator { + + def apply(options: DataSou
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171987888 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt --- End diff -- nit: `range.size`, you may remove `rangeSize` above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r17198 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum --- End diff -- nit: `map(_.size).sum` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171988510 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.sources.v2.DataSourceOptions + +class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { + + def testWithMinPartitions(name: String, minPartition: Int) + (f: KafkaOffsetRangeCalculator => Unit): Unit = { +val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava) +test(s"with minPartition = $minPartition: $name") { + f(KafkaOffsetRangeCalculator(options)) +} + } + + + test("with no minPartition: N TopicPartitions to N offset ranges") { +val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 2)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1), +untilOffsets = Map(tp1 -> 2)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1), +untilOffsets = Map(tp1 -> 2), +executorLocations = Seq("location")) == + Seq(KafkaOffsetRange(tp1, 1, 2, Some("location" + } + + test("with no minPartition: empty ranges ignored") { +val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1), +untilOffsets = Map(tp1 -> 2, tp2 -> 1)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + } + + testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc => +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1), +untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) == + Seq( +KafkaOffsetRange(tp1, 1, 2, None), +KafkaOffsetRange(tp2, 1, 2, None), +KafkaOffsetRange(tp3, 1, 2, None))) + } + + testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc => +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 5)) == + Seq( +KafkaOffsetRange(tp1, 1, 2, None), +KafkaOffsetRange(tp1, 2, 3, None), +KafkaOffsetRange(tp1, 3, 4, None), +KafkaOffsetRange(tp1, 4, 5, None))) + +assert( + calc.getRanges( +fromOffsets = Map(tp1 -> 1), +untilOffsets = Map(tp1 -> 5), +executorLocations = Seq("location")) == +Seq( + KafkaOffsetRange(tp1, 1, 2, None), + KafkaOffsetRange(tp1, 2, 3, None), + KafkaOffsetRange(tp1, 3, 4, None), + KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when minPartition is set + } + + testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { calc => --- End diff -- can you also add a test: ``` fromOffsets = Map(tp1 -> 1), untilOffsets = Map(tp1 -> 10) minPartitions = 3 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: re
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171988062 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +} + +private[kafka010] object KafkaOffsetRangeCalculator { + + def apply(options: DataS
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171988112 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +} + +private[kafka010] object KafkaOffsetRangeCalculator { + + def apply(options: DataS
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171983185 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + --- End diff -- nit: extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171750758 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt --- End diff -- I rewrote this completely using the code used by from `sparkContext.parallelize` to make splits. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L123 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171750580 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) --- End diff -- `fromOffsets` here will contain the initial offsets of new partitions. See the how fromOffsets is set with `startOffsets + newPartitionInitialOffsets`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171750437 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { --- End diff -- Rewritten. I dont want to rely on this default value of 0, as @jose-torres expressed concern earlier. So i rewrote this to explicitly check whether minPartitions have been set or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171741765 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt +var remaining = size +var startOffset = offsetRange.fromOffset +(0 until parts).map { part => + // Fine to do integer division. Last partition will consume all the round off errors + val thisPartition = remaining / (parts - part) + remaining -= thisPartition + val endOffset = startOffset + thisPartition + val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None) + startOffset = endOffset + offsetRange +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +}
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732779 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -370,8 +361,14 @@ private[kafka010] class KafkaMicroBatchDataReader( override def close(): Unit = { // Indicate that we're no longer using this consumer --- End diff -- maybe remove this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171733516 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt --- End diff -- yeah, a comment about how this is calculating the `weight` of partitions to assign to this topic would help. In addition, the sum of `parts` after this calculation will be `>= minPartitions` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171733038 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { --- End diff -- I don't think we need the first check. `offsetRanges.size` should be greater than 0 right? Otherwise we wouldn't have called into this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732729 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader( } /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] class KafkaMicroBatchDataReaderFactory( -range: KafkaOffsetRange, -preferredLoc: Option[String], +private[kafka010] case class KafkaMicroBatchDataReaderFactory( +offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { +failOnDataLoss: Boolean, +reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] { - override def preferredLocations(): Array[String] = preferredLoc.toArray + override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader( -range, executorKafkaParams, pollTimeoutMs, failOnDataLoss) +offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) } /** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] class KafkaMicroBatchDataReader( +private[kafka010] case class KafkaMicroBatchDataReader( offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging { +failOnDataLoss: Boolean, +reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging { + + private val consumer = { +if (!reuseKafkaConsumer) { + // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we --- End diff -- `nit: We use 'assign' here, hence don't need to ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171733181 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) --- End diff -- was this check here before? What if there are new topic partitions? Are we missing those, because they may not exist in fromOffsets? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732183 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever // (KAFKA-1894). -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) --- End diff -- Assertions can be turned off --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732015 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever // (KAFKA-1894). -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) --- End diff -- not much really. assert throws Assertions and require throws IllegalArgumentException. Just a matter of preference. I can revert this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171698925 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt --- End diff -- It's hard to understand why this number is being calculated as it is. I think it's correct, but a comment explaining why this is the right number to divvy would help. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171685190 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt +var remaining = size +var startOffset = offsetRange.fromOffset +(0 until parts).map { part => + // Fine to do integer division. Last partition will consume all the round off errors + val thisPartition = remaining / (parts - part) + remaining -= thisPartition + val endOffset = startOffset + thisPartition + val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None) + startOffset = endOffset + offsetRange +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + }
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171685001 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` --- End diff -- nit: /s/numPartitions/minPartitions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171685641 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused --- End diff -- I worry that "always" is misleading here. It's not guaranteed that the same executor will run the partition or that the KafkaConsumer can be reused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171441133 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { --- End diff -- add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20698 [SPARK-23541][SS] Allow Kafka source to read data with greater parallelism than the number of topic-partitions ## What changes were proposed in this pull request? Currently, when the Kafka source reads from Kafka, it generates as many tasks as the number of partitions in the topic(s) to be read. In some case, it may be beneficial to read the data with greater parallelism, that is, with more number partitions/tasks. That means, offset ranges must be divided up into smaller ranges such the number of records in partition ~= total records in batch / desired partitions. This would also balance out any data skews between topic-partitions. In this patch, I have added a new option called `minPartitions`, which allows the user to specify the desired level of parallelism. ## How was this patch tested? New tests in KafkaMicroBatchV2SourceSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23541 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20698 commit ebb9b51c51a4411811a7e0e09fff8f8608faa017 Author: Tathagata Das Date: 2018-03-01T01:28:32Z Implemented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org