Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207437645 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { - val untilOffsets = clamp(latestOffsets()) - val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { + if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { + tpos.map { case (tp, o) => + val offsetAndCount = localRw.getLastOffsetAndCount(localOffsets(tp), tp, o) + (tp, offsetAndCount) + } + }).collect() --- End diff -- What exactly is the benefit gained by doing a duplicate read of all the messages?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org