Github user koeninger commented on the issue:
https://github.com/apache/spark/pull/21917
Recursively creating a Kafka RDD during creation of a Kafka RDD would need
a base case, but yeah, some way to have appropriate preferred locations.
On Mon, Aug 6, 2018 at 2:58 AM, Quentin Ambard <[email protected]>
wrote:
> *@QuentinAmbard* commented on this pull request.
> ------------------------------
>
> In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
> DirectKafkaInputDStream.scala
> <https://github.com/apache/spark/pull/21917#discussion_r207802444>:
>
> > - val fo = currentOffsets(tp)
> - OffsetRange(tp.topic, tp.partition, fo, uo)
> + /**
> + * Return the offset range. For non consecutive offset the last offset
must have record.
> + * If offsets have missing data (transaction marker or abort),
increases the
> + * range until we get the requested number of record or no more
records.
> + * Because we have to iterate over all the records in this case,
> + * we also return the total number of records.
> + * @param offsets the target range we would like if offset were
continue
> + * @return (totalNumberOfRecords, updated offset)
> + */
> + private def alignRanges(offsets: Map[TopicPartition, Long]):
Iterable[OffsetRange] = {
> + if (nonConsecutive) {
> + val localRw = rewinder()
> + val localOffsets = currentOffsets
> +
context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => {
>
> Are you suggesting I should create a new kafkaRDD instead, and consume
> from this RDD to get the last offset range?
>
> â
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21917#discussion_r207802444>, or
mute
> the thread
>
<https://github.com/notifications/unsubscribe-auth/AAGAB_EelzeJDa36_SAKaH8trQC5bTnGks5uN_cugaJpZM4VmlWm>
> .
>
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]