Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170279150
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
    @@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V](
         }.toArray
       }
     
    -  override def count(): Long = offsetRanges.map(_.count).sum
    +  override def count(): Long =
    +    if (compacted) {
    +      super.count()
    +    } else {
    +      offsetRanges.map(_.count).sum
    +    }
     
       override def countApprox(
           timeout: Long,
           confidence: Double = 0.95
    -  ): PartialResult[BoundedDouble] = {
    -    val c = count
    -    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
    -  }
    +  ): PartialResult[BoundedDouble] =
    +    if (compacted) {
    +      super.countApprox(timeout, confidence)
    +    } else {
    +      val c = count
    +      new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
    +    }
     
    -  override def isEmpty(): Boolean = count == 0L
    +  override def isEmpty(): Boolean =
    +    if (compacted) {
    +      super.isEmpty()
    +    } else {
    +      count == 0L
    +    }
     
    -  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
    -    val nonEmptyPartitions = this.partitions
    -      .map(_.asInstanceOf[KafkaRDDPartition])
    -      .filter(_.count > 0)
    +  override def take(num: Int): Array[ConsumerRecord[K, V]] =
    +    if (compacted) {
    +      super.take(num)
    +    } else {
    +      val nonEmptyPartitions = this.partitions
    +        .map(_.asInstanceOf[KafkaRDDPartition])
    +        .filter(_.count > 0)
     
    -    if (num < 1 || nonEmptyPartitions.isEmpty) {
    -      return new Array[ConsumerRecord[K, V]](0)
    -    }
    +      if (num < 1 || nonEmptyPartitions.isEmpty) {
    --- End diff --
    
    I guess you could check `num < 1` before the map/filter, but it's trivial.
    You could write `return Array.empty[ConsumerRecord[K,V]]` too; again 
trivial.
    Since this is existing code I could see not touching it as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to