[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-10-03 Thread QuentinAmbard
Github user QuentinAmbard commented on the issue:

https://github.com/apache/spark/pull/21917
  
SPARK-25005 has actually a far better solution to detect message lost. Will 
try to apply same logic...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-08-06 Thread QuentinAmbard
Github user QuentinAmbard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21917#discussion_r207802444
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
 }.getOrElse(offsets)
   }
 
-  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
-val untilOffsets = clamp(latestOffsets())
-val offsetRanges = untilOffsets.map { case (tp, uo) =>
-  val fo = currentOffsets(tp)
-  OffsetRange(tp.topic, tp.partition, fo, uo)
+  /**
+   * Return the offset range. For non consecutive offset the last offset 
must have record.
+   * If offsets have missing data (transaction marker or abort), increases 
the
+   * range until we get the requested number of record or no more records.
+   * Because we have to iterate over all the records in this case,
+   * we also return the total number of records.
+   * @param offsets the target range we would like if offset were continue
+   * @return (totalNumberOfRecords, updated offset)
+   */
+  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
+if (nonConsecutive) {
+  val localRw = rewinder()
+  val localOffsets = currentOffsets
+  context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
--- End diff --

Are you suggesting I should create a new kafkaRDD instead, and consume from 
this RDD to get the last offset range?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-06 Thread QuentinAmbard
Github user QuentinAmbard commented on the issue:

https://github.com/apache/spark/pull/21917
  
> By failed, you mean returned an empty collection after timing out, even 
though records should be available? You don't. You also don't know that it 
isn't just lost because kafka skipped a message. AFAIK from the information you 
have from a kafka consumer, once you start allowing gaps in offsets, you don't 
know.

Ok that's interesting, my understanding was that if you successfully poll 
and get results you are 100% sure that you don't lose anything.  Do you have 
more details on that? Why would kafka skip a record while consuming?
 
> Have you tested comparing the results of consumer.endOffsets for 
consumers with different isolation levels?

endOffsets returns the last offset (same as seekToEnd). But you're right 
that the easiest solution for us would be to have something like 
seekToLastRecord method instead. Maybe something we could also ask ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-04 Thread QuentinAmbard
Github user QuentinAmbard commented on the issue:

https://github.com/apache/spark/pull/21917
  
If you are doing it in advance you'll change the range, so for example you 
read until 3 and don't get any extra results. Maybe it's because of a 
transaction offset, maybe another issue, it's ok in both cases.
The big difference is that the next batch will restart from offset 3 and 
poll from this value. If seek to 3 and poll get you another result (for example 
6) then everything is fine  it's not a data loss it's just a gap.
The issue with your proposal is that SeekToEnd gives you the last offset 
which might not be the last record.
So in your example if last offset is 5 and after a few poll the last record 
you get is 3 what do you do, continue and execute the next batch from 5? How do 
you know that offset 4 isn't just lost because poll failed?
The only way to know that would be to get a record with an offset higher 
than 5. In this case you know it's just a gap. 
But if the message you are reading is the last of the topic you won't have 
records higher than 3, do you can't tell if it's a poll failure or an empty 
offset because of the transaction commit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-04 Thread QuentinAmbard
Github user QuentinAmbard commented on the issue:

https://github.com/apache/spark/pull/21917
  
I'm not sure to understand your point. The cause of the gap doesn't matter, 
we just want to stop on an existing offset to be able to poll it. It can be 
because of a transaction marker, a transaction abort or even just a temporary 
poll failure it's not relevant in this case.
The driver is smart enough to be able to restart from any Offset, even in 
the middle of a transaction (abort or not)
The issue with gap at the end is that you can't know if it's a gap or if 
the poll failed. 
For example SeekToEnd gives you 5 but the last record you get is 3 and 
there is no way to know if 4 is missing or just an offset gap.
How could we fix that in a different way?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...

2018-08-03 Thread QuentinAmbard
Github user QuentinAmbard commented on the issue:

https://github.com/apache/spark/pull/21917
  
With this solution we don't read the data another time "just to support 
transaction."
The current implementation of compacted topics already ready all the 
messages twice in order to get a correct count for the info tracker: `val 
inputInfo = StreamInputInfo(id, rdd.count, metadata)`
Since RDD are compacted or have "empty offset" due to transactions 
markers/abort, the only way to get the exact count is to read.
The same logic applies to select a range: if you want to get N records per 
partition, the only way to know what "untilOffset" will make you read N records 
is to read, stop when you've read N records, and get the offset.
So one of the advantage is to be able to fetch the number of records per 
partition you really want (for compacted topics and transactions).
But the real advantage is that it let you pick an "untilOffset" that is not 
empty.
For example if you don't have record for offset [4, 6]

```1, 2, 3, 4, 5, 6
a, b, c,   ,   ,  ,```

if use an offset range of [1,5), you will try to read 4 but won't receive 
any data. In this scenario you can't tell if data is missing (so it's ok) or 
you lose some data because kafka is down (not ok)

To deal with this situation, we first scan the offset and stop to the last 
offset where we had data, in the example instead of [1,5) we would go with 
[1,4) because 3 has data so it's safe to stop at 3
During the next batch, if we then get extra data we would then select the 
next range as [4, 8) and won't have any issue.
```1, 2, 3, 4, 5, 6, 7
a, b, c,   ,   ,  , g```

does that make sense?
(PS: sorry for the multiple commit, I wasn't running Mima properly, I'll 
fix the remaining issue soon)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...

2018-07-30 Thread QuentinAmbard
GitHub user QuentinAmbard opened a pull request:

https://github.com/apache/spark/pull/21917

[SPARK-24720][STREAMING-KAFKA]  add option to align ranges with offset 
having records to support kafka transaction

## What changes were proposed in this pull request?

This fix adds an option to align the ranges of each partition to be aligned 
with offset having records.
To enable this behavior, set 
spark.streaming.kafka.alignRangesToCommittedTransaction = true
Note that if a lot of transactions are abort, multiple poll of 1sec might 
be executed for each partition. 
We rewind the partition of spark.streaming.kafka.offsetSearchRewind offset 
to search the last offset with records. 
spark.streaming.kafka.offsetSearchRewind should be set to be > number of record 
in 1 typical transaction depending of the use case (by default 10).
the first rewind is executed at 
(TO_OFFSET-spark.streaming.kafka.offsetSearchRewind^1), if no data is found, we 
retry at (TO_OFFSET - spark.streaming.kafka.offsetSearchRewind^2) etc until we 
reach FROM_OFFSET.

## How was this patch tested?

Unit test for the rewinder. No integration test for transaction since the 
current kafka version doesn't support transactions. Tested against a custom 
streaming use-case.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/QuentinAmbard/spark SPARK-24720

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21917.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21917


commit a5b52c94b9f7eaa293d7882bde0fb432ef3fa632
Author: quentin 
Date:   2018-07-30T14:43:56Z

SPARK-24720 add option to align ranges with offset having records to 
support kafka transaction

commit 79d83db0f535fe1e9e5f534a6a0b4fe7c3d6257f
Author: quentin 
Date:   2018-07-30T14:47:33Z

correction indentation

commit 05c7e7fb96806c07bc9b0513ef59fbcdd5ae9118
Author: quentin 
Date:   2018-07-30T14:53:45Z

remove wrong comment edit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org