Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21917#discussion_r207721435
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
    @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
         }.getOrElse(offsets)
       }
     
    -  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    -    val untilOffsets = clamp(latestOffsets())
    -    val offsetRanges = untilOffsets.map { case (tp, uo) =>
    -      val fo = currentOffsets(tp)
    -      OffsetRange(tp.topic, tp.partition, fo, uo)
    +  /**
    +   * Return the offset range. For non consecutive offset the last offset 
must have record.
    +   * If offsets have missing data (transaction marker or abort), increases 
the
    +   * range until we get the requested number of record or no more records.
    +   * Because we have to iterate over all the records in this case,
    +   * we also return the total number of records.
    +   * @param offsets the target range we would like if offset were continue
    +   * @return (totalNumberOfRecords, updated offset)
    +   */
    +  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
    +    if (nonConsecutive) {
    +      val localRw = rewinder()
    +      val localOffsets = currentOffsets
    +      context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos 
=> {
    --- End diff --
    
    Because this isn't a kafka rdd, it isn't going to take advantage of 
preferred locations, which means it's going to create cached consumers on 
different executors.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to