Github user sryza commented on a diff in the pull request:
https://github.com/apache/spark/pull/4067#discussion_r23884482
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
---
@@ -241,21 +242,22 @@ object DataWriteMethod extends Enumeration with
Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
-
- private val _bytesRead: AtomicLong = new AtomicLong()
+
+ @volatile @transient var bytesReadCallback: Option[() => Long] = None
/**
* Total bytes read.
*/
+ private val _bytesRead: AtomicLong = new AtomicLong()
def bytesRead: Long = _bytesRead.get()
- @volatile @transient var bytesReadCallback: Option[() => Long] = None
+ def incBytesRead(bytes: Long) = _bytesRead.addAndGet(bytes)
/**
- * Adds additional bytes read for this read method.
+ * Total records read.
*/
- def addBytesRead(bytes: Long) = {
- _bytesRead.addAndGet(bytes)
- }
+ def recordsRead: Long = _recordsRead.get()
+ private val _recordsRead: AtomicLong = new AtomicLong()
--- End diff --
This will only ever be modified by a single thread, right? So better to
make it volatile than atomic long?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]