[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20698


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171989658
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
+
+private[kafka010] object KafkaOffsetRangeCalculator {
+
+  def apply(options: DataSou

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171987888
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
--- End diff --

nit: `range.size`, you may remove `rangeSize` above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r17198
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
--- End diff --

nit: `map(_.size).sum`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171988510
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
+
+  def testWithMinPartitions(name: String, minPartition: Int)
+  (f: KafkaOffsetRangeCalculator => Unit): Unit = {
+val options = new DataSourceOptions(Map("minPartitions" -> 
minPartition.toString).asJava)
+test(s"with minPartition = $minPartition: $name") {
+  f(KafkaOffsetRangeCalculator(options))
+}
+  }
+
+
+  test("with no minPartition: N TopicPartitions to N offset ranges") {
+val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 2)) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1),
+untilOffsets = Map(tp1 -> 2)) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1),
+untilOffsets = Map(tp1 -> 2),
+executorLocations = Seq("location")) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, Some("location"
+  }
+
+  test("with no minPartition: empty ranges ignored") {
+val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1),
+untilOffsets = Map(tp1 -> 2, tp2 -> 1)) ==
+  Seq(KafkaOffsetRange(tp1, 1, 2, None)))
+  }
+
+  testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc 
=>
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1),
+untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) ==
+  Seq(
+KafkaOffsetRange(tp1, 1, 2, None),
+KafkaOffsetRange(tp2, 1, 2, None),
+KafkaOffsetRange(tp3, 1, 2, None)))
+  }
+
+  testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc =>
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 5)) ==
+  Seq(
+KafkaOffsetRange(tp1, 1, 2, None),
+KafkaOffsetRange(tp1, 2, 3, None),
+KafkaOffsetRange(tp1, 3, 4, None),
+KafkaOffsetRange(tp1, 4, 5, None)))
+
+assert(
+  calc.getRanges(
+fromOffsets = Map(tp1 -> 1),
+untilOffsets = Map(tp1 -> 5),
+executorLocations = Seq("location")) ==
+Seq(
+  KafkaOffsetRange(tp1, 1, 2, None),
+  KafkaOffsetRange(tp1, 2, 3, None),
+  KafkaOffsetRange(tp1, 3, 4, None),
+  KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set 
when minPartition is set
+  }
+
+  testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) 
{ calc =>
--- End diff --

can you also add a test:
```
fromOffsets = Map(tp1 -> 1),
untilOffsets = Map(tp1 -> 10)
minPartitions = 3
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: re

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171988062
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
+
+private[kafka010] object KafkaOffsetRangeCalculator {
+
+  def apply(options: DataS

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171988112
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
+
+private[kafka010] object KafkaOffsetRangeCalculator {
+
+  def apply(options: DataS

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171983185
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+
--- End diff --

nit: extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171750758
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
--- End diff --

I rewrote this completely using the code used by from 
`sparkContext.parallelize` to make splits. 

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L123


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171750580
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
--- End diff --

`fromOffsets` here will contain the initial offsets of new partitions. See 
the how fromOffsets is set with `startOffsets + newPartitionInitialOffsets`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171750437
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
--- End diff --

Rewritten. I dont want to rely on this default value of 0, as @jose-torres 
expressed concern earlier. So i rewrote this to explicitly check whether 
minPartitions have been set or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171741765
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
+var remaining = size
+var startOffset = offsetRange.fromOffset
+(0 until parts).map { part =>
+  // Fine to do integer division. Last partition will consume all 
the round off errors
+  val thisPartition = remaining / (parts - part)
+  remaining -= thisPartition
+  val endOffset = startOffset + thisPartition
+  val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, 
preferredLoc = None)
+  startOffset = endOffset
+  offsetRange
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
 

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732779
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -370,8 +361,14 @@ private[kafka010] class KafkaMicroBatchDataReader(
 
   override def close(): Unit = {
 // Indicate that we're no longer using this consumer
--- End diff --

maybe remove this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171733516
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
--- End diff --

yeah, a comment about how this is calculating the `weight` of partitions to 
assign to this topic would help. In addition, the sum of `parts` after this 
calculation will be `>= minPartitions`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171733038
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
--- End diff --

I don't think we need the first check. `offsetRanges.size` should be 
greater than 0 right? Otherwise we wouldn't have called into this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732729
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader(
 }
 
 /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch 
streaming query. */
-private[kafka010] class KafkaMicroBatchDataReaderFactory(
-range: KafkaOffsetRange,
-preferredLoc: Option[String],
+private[kafka010] case class KafkaMicroBatchDataReaderFactory(
+offsetRange: KafkaOffsetRange,
 executorKafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+failOnDataLoss: Boolean,
+reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
 
-  override def preferredLocations(): Array[String] = preferredLoc.toArray
+  override def preferredLocations(): Array[String] = 
offsetRange.preferredLoc.toArray
 
   override def createDataReader(): DataReader[UnsafeRow] = new 
KafkaMicroBatchDataReader(
-range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, 
reuseKafkaConsumer)
 }
 
 /** A [[DataReader]] for reading Kafka data in a micro-batch streaming 
query. */
-private[kafka010] class KafkaMicroBatchDataReader(
+private[kafka010] case class KafkaMicroBatchDataReader(
 offsetRange: KafkaOffsetRange,
 executorKafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
-failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
+failOnDataLoss: Boolean,
+reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with 
Logging {
+
+  private val consumer = {
+if (!reuseKafkaConsumer) {
+  // If we can't reuse CachedKafkaConsumers, creating a new 
CachedKafkaConsumer. As here we
--- End diff --

`nit: We use 'assign' here, hence don't need to ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171733181
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
--- End diff --

was this check here before? What if there are new topic partitions? Are we 
missing those, because they may not exist in fromOffsets?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732183
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader(
 // Make sure that `KafkaConsumer.poll` is only called in 
StreamExecutionThread.
 // Otherwise, interrupting a thread while running `KafkaConsumer.poll` 
may hang forever
 // (KAFKA-1894).
-assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+require(Thread.currentThread().isInstanceOf[UninterruptibleThread])
--- End diff --

Assertions can be turned off


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732015
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader(
 // Make sure that `KafkaConsumer.poll` is only called in 
StreamExecutionThread.
 // Otherwise, interrupting a thread while running `KafkaConsumer.poll` 
may hang forever
 // (KAFKA-1894).
-assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+require(Thread.currentThread().isInstanceOf[UninterruptibleThread])
--- End diff --

not much really. assert throws Assertions and require throws 
IllegalArgumentException. Just a matter of preference. I can revert this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171698925
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
--- End diff --

It's hard to understand why this number is being calculated as it is. I 
think it's correct, but a comment explaining why this is the right number to 
divvy would help.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171685190
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
+var remaining = size
+var startOffset = offsetRange.fromOffset
+(0 until parts).map { part =>
+  // Fine to do integer division. Last partition will consume all 
the round off errors
+  val thisPartition = remaining / (parts - part)
+  remaining -= thisPartition
+  val endOffset = startOffset + thisPartition
+  val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, 
preferredLoc = None)
+  startOffset = endOffset
+  offsetRange
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
 

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171685001
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
--- End diff --

nit: /s/numPartitions/minPartitions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171685641
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
--- End diff --

I worry that "always" is misleading here. It's not guaranteed that the same 
executor will run the partition or that the KafkaConsumer can be reused.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171441133
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
--- End diff --

add docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-02-28 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20698

[SPARK-23541][SS] Allow Kafka source to read data with greater parallelism 
than the number of topic-partitions

## What changes were proposed in this pull request?

Currently, when the Kafka source reads from Kafka, it generates as many 
tasks as the number of partitions in the topic(s) to be read. In some case, it 
may be beneficial to read the data with greater parallelism, that is, with more 
number partitions/tasks. That means, offset ranges must be divided up into 
smaller ranges such the number of records in partition ~= total records in 
batch / desired partitions. This would also balance out any data skews between 
topic-partitions.

In this patch, I have added a new option called `minPartitions`, which 
allows the user to specify the desired level of parallelism.

## How was this patch tested?
New tests in KafkaMicroBatchV2SourceSuite.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23541

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20698.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20698


commit ebb9b51c51a4411811a7e0e09fff8f8608faa017
Author: Tathagata Das 
Date:   2018-03-01T01:28:32Z

Implemented




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org