cloud-fan commented on a change in pull request #27021: [SPARK-30362][Core]
Update InputMetrics in DataSourceRDD
URL: https://github.com/apache/spark/pull/27021#discussion_r372938259
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
##########
@@ -80,3 +70,65 @@ class DataSourceRDD(
castPartition(split).inputPartition.preferredLocations()
}
}
+
+private class PartitionIterator[T](reader: PartitionReader[T]) extends
Iterator[T] {
+ private[this] var valuePrepared = false
+
+ override def hasNext: Boolean = {
+ if (!valuePrepared) {
+ valuePrepared = reader.next()
+ }
+ valuePrepared
+ }
+
+ override def next(): T = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ valuePrepared = false
+ reader.get()
+ }
+}
+
+private class MetricsHandler extends Logging with Serializable {
+ private val inputMetrics = TaskContext.get().taskMetrics().inputMetrics
+ private val startingBytesRead = inputMetrics.bytesRead
+ private val getBytesRead =
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+
+ def updateMetrics(numRows: Int, force: Boolean = false): Unit = {
+ inputMetrics.incRecordsRead(numRows)
+ val shouldUpdateBytesRead =
+ inputMetrics.recordsRead %
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0
+ if (shouldUpdateBytesRead || force) {
+ inputMetrics.setBytesRead(startingBytesRead + getBytesRead())
+ }
+ }
+}
+
+private class MetricsIterator[I](iter: Iterator[I]) extends Iterator[I] {
+ protected val metricsHandler = new MetricsHandler
+
+ override def hasNext: Boolean = {
+ if (iter.hasNext) {
+ true
+ } else {
+ metricsHandler.updateMetrics(0, force = true)
+ false
+ }
+ }
+
+ override def next(): I = {
+ val item = iter.next
+ metricsHandler.updateMetrics(1)
+ item
+ }
+}
+
+private class MetricsBatchIterator(
Review comment:
nit: to be consistent, let's have a `MetricsRowIterator`, and the base class
`MetricsIterator` doesn't need to implement `next`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]