Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20572#discussion_r170279150
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
---
@@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V](
}.toArray
}
- override def count(): Long = offsetRanges.map(_.count).sum
+ override def count(): Long =
+ if (compacted) {
+ super.count()
+ } else {
+ offsetRanges.map(_.count).sum
+ }
override def countApprox(
timeout: Long,
confidence: Double = 0.95
- ): PartialResult[BoundedDouble] = {
- val c = count
- new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
- }
+ ): PartialResult[BoundedDouble] =
+ if (compacted) {
+ super.countApprox(timeout, confidence)
+ } else {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
- override def isEmpty(): Boolean = count == 0L
+ override def isEmpty(): Boolean =
+ if (compacted) {
+ super.isEmpty()
+ } else {
+ count == 0L
+ }
- override def take(num: Int): Array[ConsumerRecord[K, V]] = {
- val nonEmptyPartitions = this.partitions
- .map(_.asInstanceOf[KafkaRDDPartition])
- .filter(_.count > 0)
+ override def take(num: Int): Array[ConsumerRecord[K, V]] =
+ if (compacted) {
+ super.take(num)
+ } else {
+ val nonEmptyPartitions = this.partitions
+ .map(_.asInstanceOf[KafkaRDDPartition])
+ .filter(_.count > 0)
- if (num < 1 || nonEmptyPartitions.isEmpty) {
- return new Array[ConsumerRecord[K, V]](0)
- }
+ if (num < 1 || nonEmptyPartitions.isEmpty) {
--- End diff --
I guess you could check `num < 1` before the map/filter, but it's trivial.
You could write `return Array.empty[ConsumerRecord[K,V]]` too; again
trivial.
Since this is existing code I could see not touching it as well.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]