[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user QuentinAmbard commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207802444 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { --- End diff -- Are you suggesting I should create a new kafkaRDD instead, and consume from this RDD to get the last offset range? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721681 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerRecords} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging + +class OffsetWithRecordScannerSuite + extends SparkFunSuite +with Logging { + + class OffsetWithRecordScannerMock[K, V](records: List[Option[ConsumerRecord[K, V]]]) +extends OffsetWithRecordScanner[K, V]( + Map[String, Object]("isolation.level" -> "read_committed").asJava, 1, 1, 0.75F, true) { +var i = -1 +override protected def getNext(c: KafkaDataConsumer[K, V]): Option[ConsumerRecord[K, V]] = { + i = i + 1 + records(i) +} + + } + + val emptyConsumerRecords = new ConsumerRecords[String, String](ju.Collections.emptyMap()) + val tp = new TopicPartition("topic", 0) + + test("Rewinder construction should fail if isolation level isn set to read_committed") { +intercept[IllegalStateException] { + new OffsetWithRecordScanner[String, String]( +Map[String, Object]("isolation.level" -> "read_uncommitted").asJava, 1, 1, 0.75F, true) +} + } + + test("Rewinder construction shouldn't fail if isolation level isn't set") { + assert(new OffsetWithRecordScanner[String, String]( +Map[String, Object]().asJava, 1, 1, 0.75F, true) != null) + } + + test("Rewinder construction should fail if isolation level isn't set to committed") { +intercept[IllegalStateException] { + new OffsetWithRecordScanner[String, String]( +Map[String, Object]("isolation.level" -> "read_uncommitted").asJava, 1, 1, 0.75F, true) +} + } + + test("Rewind should return the proper count.") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(2), Some(3))) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) +assert(offset === 2) +assert(size === 2) + } + + test("Rewind should return the proper count with gap") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(3), Some(4), Some(5))) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3) +assert(offset === 4) +assert(size === 3) + } + + test("Rewind should return the proper count for the end of the iterator") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(2), None)) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3) +assert(offset === 3) +assert(size === 3) + } + + test("Rewind should return the proper count missing data") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), None)) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) +assert(offset === 1) +assert(size === 1) + } + + test("Rewind should return the proper count without data") { +var scanner = new OffsetWithRecordScannerMock[String, String]( + records(None)) +val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) +assert(offset === 0) +assert(size === 0) + } + + private def records(offsets: Option[Long]*) = { +offsets.map(o => o.map(new ConsumerRecord("topic", 0, _, "k", "v"))).toList + } +} --- End diff --
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721657 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala --- @@ -90,21 +90,23 @@ final class OffsetRange private( val topic: String, val partition: Int, val fromOffset: Long, -val untilOffset: Long) extends Serializable { +val untilOffset: Long, +val recordNumber: Long) extends Serializable { --- End diff -- Does mima actually complain about binary compatibility if you just make recordNumber count? It's just an accessor either way... If so, and you have to do this, I'd name this recordCount consistently throughout. Number could refer to a lot of things that aren't counts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721492 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -191,6 +211,11 @@ private[kafka010] class InternalKafkaConsumer[K, V]( buffer.previous() } + def seekAndPoll(offset: Long, timeout: Long): ConsumerRecords[K, V] = { --- End diff -- Is this used anywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721482 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -183,6 +187,22 @@ private[kafka010] class InternalKafkaConsumer[K, V]( record } + /** + * Similar to compactedStart but will return None if poll doesn't --- End diff -- Did you mean compactedNext? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207721435 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { --- End diff -- Because this isn't a kafka rdd, it isn't going to take advantage of preferred locations, which means it's going to create cached consumers on different executors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207437645 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { +tpos.map { case (tp, o) => + val offsetAndCount = localRw.getLastOffsetAndCount(localOffsets(tp), tp, o) + (tp, offsetAndCount) +} + }).collect() --- End diff -- What exactly is the benefit gained by doing a duplicate read of all the messages? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
GitHub user QuentinAmbard opened a pull request: https://github.com/apache/spark/pull/21917 [SPARK-24720][STREAMING-KAFKA] add option to align ranges with offset having records to support kafka transaction ## What changes were proposed in this pull request? This fix adds an option to align the ranges of each partition to be aligned with offset having records. To enable this behavior, set spark.streaming.kafka.alignRangesToCommittedTransaction = true Note that if a lot of transactions are abort, multiple poll of 1sec might be executed for each partition. We rewind the partition of spark.streaming.kafka.offsetSearchRewind offset to search the last offset with records. spark.streaming.kafka.offsetSearchRewind should be set to be > number of record in 1 typical transaction depending of the use case (by default 10). the first rewind is executed at (TO_OFFSET-spark.streaming.kafka.offsetSearchRewind^1), if no data is found, we retry at (TO_OFFSET - spark.streaming.kafka.offsetSearchRewind^2) etc until we reach FROM_OFFSET. ## How was this patch tested? Unit test for the rewinder. No integration test for transaction since the current kafka version doesn't support transactions. Tested against a custom streaming use-case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/QuentinAmbard/spark SPARK-24720 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21917.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21917 commit a5b52c94b9f7eaa293d7882bde0fb432ef3fa632 Author: quentin Date: 2018-07-30T14:43:56Z SPARK-24720 add option to align ranges with offset having records to support kafka transaction commit 79d83db0f535fe1e9e5f534a6a0b4fe7c3d6257f Author: quentin Date: 2018-07-30T14:47:33Z correction indentation commit 05c7e7fb96806c07bc9b0513ef59fbcdd5ae9118 Author: quentin Date: 2018-07-30T14:53:45Z remove wrong comment edit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org