sunchao commented on a change in pull request #33239:
URL: https://github.com/apache/spark/pull/33239#discussion_r672830123
##########
File path:
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java
##########
@@ -62,4 +63,13 @@ default BatchWrite toBatch() {
default StreamingWrite toStreaming() {
throw new UnsupportedOperationException(description() + ": Streaming write
is not supported");
}
+
+ /**
+ * Returns an array of supported custom metrics with name and description.
+ * By default it returns empty array.
+ */
+ default CustomMetric[] supportedCustomMetrics() {
+ CustomMetric[] NO_METRICS = {};
+ return NO_METRICS;
Review comment:
ditto
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
##########
@@ -276,7 +279,12 @@ case class OverwritePartitionsDynamicExec(
case class WriteToDataSourceV2Exec(
batchWrite: BatchWrite,
refreshCache: () => Unit,
- query: SparkPlan) extends V2TableWriteExec {
+ query: SparkPlan,
+ writeMetrics: Seq[CustomMetric]) extends V2TableWriteExec {
+
+ override val customMetrics = writeMetrics.map { customMetric =>
Review comment:
nit: add type annotation to public member of the class
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
##########
@@ -79,9 +81,15 @@ abstract class FileFormatDataWriter(
/** Write an iterator of records. */
def writeWithIterator(iterator: Iterator[InternalRow]): Unit = {
+ var count = 0L
Review comment:
could we have something like:
```scala
def writeWithMetrics(record: InternalRow): Unit = {
if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {
CustomMetrics.updateMetrics(currentMetricsValues, customMetrics)
}
count += 1
write(record)
}
```
and then update all the places to use this? instead of replicating the code
in several places?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
##########
@@ -292,6 +300,10 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
def refreshCache: () => Unit
def write: Write
+ override val customMetrics = write.supportedCustomMetrics().map {
customMetric =>
Review comment:
ditto
##########
File path:
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
##########
@@ -104,4 +105,13 @@
* @throws IOException if failure happens during disk/network IO like
writing files.
*/
void abort() throws IOException;
+
+ /**
+ * Returns an array of custom task metrics. By default it returns empty
array. Note that it is
+ * not recommended to put heavy logic in this method as it may affect
writing performance.
+ */
+ default CustomTaskMetric[] currentMetricsValues() {
+ CustomTaskMetric[] NO_METRICS = {};
+ return NO_METRICS;
Review comment:
nit: can we just do it in one line: `return new CustomTaskMetric[]{};`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]