[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user QuentinAmbard commented on the issue: https://github.com/apache/spark/pull/21917 SPARK-25005 has actually a far better solution to detect message lost. Will try to apply same logic... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user QuentinAmbard commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207802444 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { --- End diff -- Are you suggesting I should create a new kafkaRDD instead, and consume from this RDD to get the last offset range? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user QuentinAmbard commented on the issue: https://github.com/apache/spark/pull/21917 > By failed, you mean returned an empty collection after timing out, even though records should be available? You don't. You also don't know that it isn't just lost because kafka skipped a message. AFAIK from the information you have from a kafka consumer, once you start allowing gaps in offsets, you don't know. Ok that's interesting, my understanding was that if you successfully poll and get results you are 100% sure that you don't lose anything. Do you have more details on that? Why would kafka skip a record while consuming? > Have you tested comparing the results of consumer.endOffsets for consumers with different isolation levels? endOffsets returns the last offset (same as seekToEnd). But you're right that the easiest solution for us would be to have something like seekToLastRecord method instead. Maybe something we could also ask ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user QuentinAmbard commented on the issue: https://github.com/apache/spark/pull/21917 If you are doing it in advance you'll change the range, so for example you read until 3 and don't get any extra results. Maybe it's because of a transaction offset, maybe another issue, it's ok in both cases. The big difference is that the next batch will restart from offset 3 and poll from this value. If seek to 3 and poll get you another result (for example 6) then everything is fine it's not a data loss it's just a gap. The issue with your proposal is that SeekToEnd gives you the last offset which might not be the last record. So in your example if last offset is 5 and after a few poll the last record you get is 3 what do you do, continue and execute the next batch from 5? How do you know that offset 4 isn't just lost because poll failed? The only way to know that would be to get a record with an offset higher than 5. In this case you know it's just a gap. But if the message you are reading is the last of the topic you won't have records higher than 3, do you can't tell if it's a poll failure or an empty offset because of the transaction commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user QuentinAmbard commented on the issue: https://github.com/apache/spark/pull/21917 I'm not sure to understand your point. The cause of the gap doesn't matter, we just want to stop on an existing offset to be able to poll it. It can be because of a transaction marker, a transaction abort or even just a temporary poll failure it's not relevant in this case. The driver is smart enough to be able to restart from any Offset, even in the middle of a transaction (abort or not) The issue with gap at the end is that you can't know if it's a gap or if the poll failed. For example SeekToEnd gives you 5 but the last record you get is 3 and there is no way to know if 4 is missing or just an offset gap. How could we fix that in a different way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user QuentinAmbard commented on the issue: https://github.com/apache/spark/pull/21917 With this solution we don't read the data another time "just to support transaction." The current implementation of compacted topics already ready all the messages twice in order to get a correct count for the info tracker: `val inputInfo = StreamInputInfo(id, rdd.count, metadata)` Since RDD are compacted or have "empty offset" due to transactions markers/abort, the only way to get the exact count is to read. The same logic applies to select a range: if you want to get N records per partition, the only way to know what "untilOffset" will make you read N records is to read, stop when you've read N records, and get the offset. So one of the advantage is to be able to fetch the number of records per partition you really want (for compacted topics and transactions). But the real advantage is that it let you pick an "untilOffset" that is not empty. For example if you don't have record for offset [4, 6] ```1, 2, 3, 4, 5, 6 a, b, c, , , ,``` if use an offset range of [1,5), you will try to read 4 but won't receive any data. In this scenario you can't tell if data is missing (so it's ok) or you lose some data because kafka is down (not ok) To deal with this situation, we first scan the offset and stop to the last offset where we had data, in the example instead of [1,5) we would go with [1,4) because 3 has data so it's safe to stop at 3 During the next batch, if we then get extra data we would then select the next range as [4, 8) and won't have any issue. ```1, 2, 3, 4, 5, 6, 7 a, b, c, , , , g``` does that make sense? (PS: sorry for the multiple commit, I wasn't running Mima properly, I'll fix the remaining issue soon) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
GitHub user QuentinAmbard opened a pull request: https://github.com/apache/spark/pull/21917 [SPARK-24720][STREAMING-KAFKA] add option to align ranges with offset having records to support kafka transaction ## What changes were proposed in this pull request? This fix adds an option to align the ranges of each partition to be aligned with offset having records. To enable this behavior, set spark.streaming.kafka.alignRangesToCommittedTransaction = true Note that if a lot of transactions are abort, multiple poll of 1sec might be executed for each partition. We rewind the partition of spark.streaming.kafka.offsetSearchRewind offset to search the last offset with records. spark.streaming.kafka.offsetSearchRewind should be set to be > number of record in 1 typical transaction depending of the use case (by default 10). the first rewind is executed at (TO_OFFSET-spark.streaming.kafka.offsetSearchRewind^1), if no data is found, we retry at (TO_OFFSET - spark.streaming.kafka.offsetSearchRewind^2) etc until we reach FROM_OFFSET. ## How was this patch tested? Unit test for the rewinder. No integration test for transaction since the current kafka version doesn't support transactions. Tested against a custom streaming use-case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/QuentinAmbard/spark SPARK-24720 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21917.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21917 commit a5b52c94b9f7eaa293d7882bde0fb432ef3fa632 Author: quentin Date: 2018-07-30T14:43:56Z SPARK-24720 add option to align ranges with offset having records to support kafka transaction commit 79d83db0f535fe1e9e5f534a6a0b4fe7c3d6257f Author: quentin Date: 2018-07-30T14:47:33Z correction indentation commit 05c7e7fb96806c07bc9b0513ef59fbcdd5ae9118 Author: quentin Date: 2018-07-30T14:53:45Z remove wrong comment edit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org