Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/3120#discussion_r20260893
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -252,7 +258,7 @@ class HadoopRDD[K, V](
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
- inputMetrics.bytesRead = bytesReadFn()
+ inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart
--- End diff --
Yeah I was proposing changing BytesReadFn() to instead return the
incremental bytes read.
I looked into this more and it looks like the scenario I proposed does
happen with CartesianRDD.scala. I think we should fix this, both so that the
input bytes is correct for CartesianRDD.scala, and also to prevent future bugs
(a very minor change in CoalescedRDD.scala) could make the bug occur there too.
To be slightly more specific about the issue, if you look at
CartesianRDD.scala, for example
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala#L75),
first, an iterator is created for each parent RDD. If these are both RDDs are
Hadoop RDDs, they will both have bytesReadAtStart = 0. As getNext() is called
on each RDD (via the for loop I linked to), inputMetrics.bytesRead will keep
getting clobbered, so the final value will only reflect the bytes read from one
parent RDD rather than the bytes read from both parents. To re-create this
problem:
> sc.parallelize(1 to 2).saveAsTextFile("file:///<yourpath>/tester1")
> val a = sc.textFile("file:///<yourpath>/tester1")
> sc.parallelize(1 to 10).saveAsTextFile("file:///<yourpath>/tester2")
> val b = sc.textFile("file:///<yourpath>/tester2")
> a.cartesian(b)
> b.cartesian(a)
The last two calls will result in different sizes -- and one will be
smaller than the size of file b, so clearly wrong.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]