Github user QuentinAmbard commented on a diff in the pull request:
https://github.com/apache/spark/pull/21917#discussion_r207802444
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
}.getOrElse(offsets)
}
- override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
- val untilOffsets = clamp(latestOffsets())
- val offsetRanges = untilOffsets.map { case (tp, uo) =>
- val fo = currentOffsets(tp)
- OffsetRange(tp.topic, tp.partition, fo, uo)
+ /**
+ * Return the offset range. For non consecutive offset the last offset
must have record.
+ * If offsets have missing data (transaction marker or abort), increases
the
+ * range until we get the requested number of record or no more records.
+ * Because we have to iterate over all the records in this case,
+ * we also return the total number of records.
+ * @param offsets the target range we would like if offset were continue
+ * @return (totalNumberOfRecords, updated offset)
+ */
+ private def alignRanges(offsets: Map[TopicPartition, Long]):
Iterable[OffsetRange] = {
+ if (nonConsecutive) {
+ val localRw = rewinder()
+ val localOffsets = currentOffsets
+ context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos
=> {
--- End diff --
Are you suggesting I should create a new kafkaRDD instead, and consume from
this RDD to get the last offset range?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]