Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/21917
  
    Recursively creating a Kafka RDD during creation of a Kafka RDD would need
    a base case, but yeah, some way to have appropriate preferred locations.
    
    On Mon, Aug 6, 2018 at 2:58 AM, Quentin Ambard <notificati...@github.com>
    wrote:
    
    > *@QuentinAmbard* commented on this pull request.
    > ------------------------------
    >
    > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
    > DirectKafkaInputDStream.scala
    > <https://github.com/apache/spark/pull/21917#discussion_r207802444>:
    >
    > > -      val fo = currentOffsets(tp)
    > -      OffsetRange(tp.topic, tp.partition, fo, uo)
    > +  /**
    > +   * Return the offset range. For non consecutive offset the last offset 
must have record.
    > +   * If offsets have missing data (transaction marker or abort), 
increases the
    > +   * range until we get the requested number of record or no more 
records.
    > +   * Because we have to iterate over all the records in this case,
    > +   * we also return the total number of records.
    > +   * @param offsets the target range we would like if offset were 
continue
    > +   * @return (totalNumberOfRecords, updated offset)
    > +   */
    > +  private def alignRanges(offsets: Map[TopicPartition, Long]): 
Iterable[OffsetRange] = {
    > +    if (nonConsecutive) {
    > +      val localRw = rewinder()
    > +      val localOffsets = currentOffsets
    > +      
context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => {
    >
    > Are you suggesting I should create a new kafkaRDD instead, and consume
    > from this RDD to get the last offset range?
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/21917#discussion_r207802444>, or 
mute
    > the thread
    > 
<https://github.com/notifications/unsubscribe-auth/AAGAB_EelzeJDa36_SAKaH8trQC5bTnGks5uN_cugaJpZM4VmlWm>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to