[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-06 Thread QuentinAmbard
Github user QuentinAmbard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207802444
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
 }.getOrElse(offsets)
   }
 
-  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
-val untilOffsets = clamp(latestOffsets())
-val offsetRanges = untilOffsets.map { case (tp, uo) =>
-  val fo = currentOffsets(tp)
-  OffsetRange(tp.topic, tp.partition, fo, uo)
+  /**
+   * Return the offset range. For non consecutive offset the last offset 
must have record.
+   * If offsets have missing data (transaction marker or abort), increases 
the
+   * range until we get the requested number of record or no more records.
+   * Because we have to iterate over all the records in this case,
+   * we also return the total number of records.
+   * @param offsets the target range we would like if offset were continue
+   * @return (totalNumberOfRecords, updated offset)
+   */
+  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
+if (nonConsecutive) {
+  val localRw = rewinder()
+  val localOffsets = currentOffsets
+  context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
--- End diff --

Are you suggesting I should create a new kafkaRDD instead, and consume from 
this RDD to get the last offset range?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721681
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, 
ConsumerRecords}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.Logging
+
+class OffsetWithRecordScannerSuite
+  extends SparkFunSuite
+with Logging {
+
+  class OffsetWithRecordScannerMock[K, V](records: 
List[Option[ConsumerRecord[K, V]]])
+extends OffsetWithRecordScanner[K, V](
+  Map[String, Object]("isolation.level" -> "read_committed").asJava, 
1, 1, 0.75F, true) {
+var i = -1
+override protected def getNext(c: KafkaDataConsumer[K, V]): 
Option[ConsumerRecord[K, V]] = {
+  i = i + 1
+  records(i)
+}
+
+  }
+
+  val emptyConsumerRecords = new ConsumerRecords[String, 
String](ju.Collections.emptyMap())
+  val tp = new TopicPartition("topic", 0)
+
+  test("Rewinder construction should fail if isolation level isn set to 
read_committed") {
+intercept[IllegalStateException] {
+  new OffsetWithRecordScanner[String, String](
+Map[String, Object]("isolation.level" -> 
"read_uncommitted").asJava, 1, 1, 0.75F, true)
+}
+  }
+
+  test("Rewinder construction shouldn't fail if isolation level isn't 
set") {
+  assert(new OffsetWithRecordScanner[String, String](
+Map[String, Object]().asJava, 1, 1, 0.75F, true) != null)
+  }
+
+  test("Rewinder construction should fail if isolation level isn't set to 
committed") {
+intercept[IllegalStateException] {
+  new OffsetWithRecordScanner[String, String](
+Map[String, Object]("isolation.level" -> 
"read_uncommitted").asJava, 1, 1, 0.75F, true)
+}
+  }
+
+  test("Rewind should return the proper count.") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), Some(1), Some(2), Some(3)))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2)
+assert(offset === 2)
+assert(size === 2)
+  }
+
+  test("Rewind should return the proper count with gap") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), Some(1), Some(3), Some(4), Some(5)))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3)
+assert(offset === 4)
+assert(size === 3)
+  }
+
+  test("Rewind should return the proper count for the end of the 
iterator") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), Some(1), Some(2), None))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3)
+assert(offset === 3)
+assert(size === 3)
+  }
+
+  test("Rewind should return the proper count missing data") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(Some(0), None))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2)
+assert(offset === 1)
+assert(size === 1)
+  }
+
+  test("Rewind should return the proper count without data") {
+var scanner = new OffsetWithRecordScannerMock[String, String](
+  records(None))
+val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2)
+assert(offset === 0)
+assert(size === 0)
+  }
+
+  private def records(offsets: Option[Long]*) = {
+offsets.map(o => o.map(new ConsumerRecord("topic", 0, _, "k", 
"v"))).toList
+  }
+}
--- End diff --


[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721657
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
 ---
@@ -90,21 +90,23 @@ final class OffsetRange private(
 val topic: String,
 val partition: Int,
 val fromOffset: Long,
-val untilOffset: Long) extends Serializable {
+val untilOffset: Long,
+val recordNumber: Long) extends Serializable {
--- End diff --

Does mima actually complain about binary compatibility if you just make 
recordNumber count?  It's just an accessor either way...

If so, and you have to do this, I'd name this recordCount consistently 
throughout.  Number could refer to a lot of things that aren't counts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721492
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -191,6 +211,11 @@ private[kafka010] class InternalKafkaConsumer[K, V](
 buffer.previous()
   }
 
+  def seekAndPoll(offset: Long, timeout: Long): ConsumerRecords[K, V] = {
--- End diff --

Is this used anywhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721482
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -183,6 +187,22 @@ private[kafka010] class InternalKafkaConsumer[K, V](
 record
   }
 
+  /**
+   * Similar to compactedStart but will return None if poll doesn't
--- End diff --

Did you mean compactedNext?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207721435
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
 }.getOrElse(offsets)
   }
 
-  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
-val untilOffsets = clamp(latestOffsets())
-val offsetRanges = untilOffsets.map { case (tp, uo) =>
-  val fo = currentOffsets(tp)
-  OffsetRange(tp.topic, tp.partition, fo, uo)
+  /**
+   * Return the offset range. For non consecutive offset the last offset 
must have record.
+   * If offsets have missing data (transaction marker or abort), increases 
the
+   * range until we get the requested number of record or no more records.
+   * Because we have to iterate over all the records in this case,
+   * we also return the total number of records.
+   * @param offsets the target range we would like if offset were continue
+   * @return (totalNumberOfRecords, updated offset)
+   */
+  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
+if (nonConsecutive) {
+  val localRw = rewinder()
+  val localOffsets = currentOffsets
+  context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
--- End diff --

Because this isn't a kafka rdd, it isn't going to take advantage of 
preferred locations, which means it's going to create cached consumers on 
different executors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207437645
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
 }.getOrElse(offsets)
   }
 
-  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
-val untilOffsets = clamp(latestOffsets())
-val offsetRanges = untilOffsets.map { case (tp, uo) =>
-  val fo = currentOffsets(tp)
-  OffsetRange(tp.topic, tp.partition, fo, uo)
+  /**
+   * Return the offset range. For non consecutive offset the last offset 
must have record.
+   * If offsets have missing data (transaction marker or abort), increases 
the
+   * range until we get the requested number of record or no more records.
+   * Because we have to iterate over all the records in this case,
+   * we also return the total number of records.
+   * @param offsets the target range we would like if offset were continue
+   * @return (totalNumberOfRecords, updated offset)
+   */
+  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
+if (nonConsecutive) {
+  val localRw = rewinder()
+  val localOffsets = currentOffsets
+  context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
+tpos.map { case (tp, o) =>
+  val offsetAndCount = 
localRw.getLastOffsetAndCount(localOffsets(tp), tp, o)
+  (tp, offsetAndCount)
+}
+  }).collect()
--- End diff --

What exactly is the benefit gained by doing a duplicate read of all the 
messages?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-07-30 Thread QuentinAmbard
GitHub user QuentinAmbard opened a pull request:

https://github.com/apache/spark/pull/21917

[SPARK-24720][STREAMING-KAFKA]  add option to align ranges with offset 
having records to support kafka transaction

## What changes were proposed in this pull request?

This fix adds an option to align the ranges of each partition to be aligned 
with offset having records.
To enable this behavior, set 
spark.streaming.kafka.alignRangesToCommittedTransaction = true
Note that if a lot of transactions are abort, multiple poll of 1sec might 
be executed for each partition. 
We rewind the partition of spark.streaming.kafka.offsetSearchRewind offset 
to search the last offset with records. 
spark.streaming.kafka.offsetSearchRewind should be set to be > number of record 
in 1 typical transaction depending of the use case (by default 10).
the first rewind is executed at 
(TO_OFFSET-spark.streaming.kafka.offsetSearchRewind^1), if no data is found, we 
retry at (TO_OFFSET - spark.streaming.kafka.offsetSearchRewind^2) etc until we 
reach FROM_OFFSET.

## How was this patch tested?

Unit test for the rewinder. No integration test for transaction since the 
current kafka version doesn't support transactions. Tested against a custom 
streaming use-case.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/QuentinAmbard/spark SPARK-24720

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21917.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21917


commit a5b52c94b9f7eaa293d7882bde0fb432ef3fa632
Author: quentin 
Date:   2018-07-30T14:43:56Z

SPARK-24720 add option to align ranges with offset having records to 
support kafka transaction

commit 79d83db0f535fe1e9e5f534a6a0b4fe7c3d6257f
Author: quentin 
Date:   2018-07-30T14:47:33Z

correction indentation

commit 05c7e7fb96806c07bc9b0513ef59fbcdd5ae9118
Author: quentin 
Date:   2018-07-30T14:53:45Z

remove wrong comment edit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org