Github user koeninger commented on the issue: https://github.com/apache/spark/pull/21917 Recursively creating a Kafka RDD during creation of a Kafka RDD would need a base case, but yeah, some way to have appropriate preferred locations. On Mon, Aug 6, 2018 at 2:58 AM, Quentin Ambard <notificati...@github.com> wrote: > *@QuentinAmbard* commented on this pull request. > ------------------------------ > > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ > DirectKafkaInputDStream.scala > <https://github.com/apache/spark/pull/21917#discussion_r207802444>: > > > - val fo = currentOffsets(tp) > - OffsetRange(tp.topic, tp.partition, fo, uo) > + /** > + * Return the offset range. For non consecutive offset the last offset must have record. > + * If offsets have missing data (transaction marker or abort), increases the > + * range until we get the requested number of record or no more records. > + * Because we have to iterate over all the records in this case, > + * we also return the total number of records. > + * @param offsets the target range we would like if offset were continue > + * @return (totalNumberOfRecords, updated offset) > + */ > + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { > + if (nonConsecutive) { > + val localRw = rewinder() > + val localOffsets = currentOffsets > + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { > > Are you suggesting I should create a new kafkaRDD instead, and consume > from this RDD to get the last offset range? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21917#discussion_r207802444>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGAB_EelzeJDa36_SAKaH8trQC5bTnGks5uN_cugaJpZM4VmlWm> > . >
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org