[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236720141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- Done in 0348ae5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236646423 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- ``` do you mean we may leave the metrics empty when creating ShuffledRowRDD in tests? ``` Yes, like we did in `UnsafeRowSerializerSuite`. ``` I don't think we need to consider this case since ShuffledRowRDD is a private API ``` Got it, after search `new ShuffledRowRDD` in all source code, `UnsafeRowSerializerSuite` is the only place, I'll change the test and delete the default value of `metrics` in this commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236637264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- do you mean we may leave the `metrics` empty when creating `ShuffledRowRDD` in tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236636819 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- I don't think we need to consider this case since `ShuffledRowRDD` is a private API. If we do need to consider it, we also need to take care if users pass in a `metrics` that is invalid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236251210 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -82,6 +82,14 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 + val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" + val REMOTE_BYTES_READ = "remoteBytesRead" + val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" + val LOCAL_BYTES_READ = "localBytesRead" + val FETCH_WAIT_TIME = "fetchWaitTime" + val RECORDS_READ = "recordsRead" --- End diff -- Thanks for your advise Wenchen, I tried `sync this list with ShuffleReadMetrics` locally and left these comments below: 1. It's easy to sync SQLMetrics with `ShuffleReadMetrics` while the task has only one shuffle reader, just call `ShuffleMetricsReporter.incXXX` to achieve this. 2. But for multi shuffle reader in single task, we need add `setXXX` functions in `ShuffleMetricsReporter` trait, cause we need reset the SQLMetrics after `setMergeValues` called by every shuffle reader. 3. I also tried to achieve this but not change the `ShuffleMetricsReporter`, like call the `ShuffleMetricsReporter.incXXX` at driver side when taskEnd, but maybe this is not a good way. If you think it's make sense to change the `ShuffleMetricsReporter` trait, I'll give a commit soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236108610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -82,6 +82,14 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 + val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" + val REMOTE_BYTES_READ = "remoteBytesRead" + val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" + val LOCAL_BYTES_READ = "localBytesRead" + val FETCH_WAIT_TIME = "fetchWaitTime" + val RECORDS_READ = "recordsRead" --- End diff -- Is there an easy way to sync this list with `ShuffleReadMetrics` instead of doing it manually? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236032855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.executor.TempShuffleReadMetrics + +/** + * A shuffle metrics reporter for SQL exchange operators. + * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. + * @param metrics All metrics in current SparkPlan. + */ +class SQLShuffleMetricsReporter( + tempMetrics: TempShuffleReadMetrics, + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + + override def incRemoteBlocksFetched(v: Long): Unit = { +metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v) --- End diff -- Sorry for the less consideration on per-row operation here, I should be more careful. Fix done in cb46bfe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236025838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.executor.TempShuffleReadMetrics + +/** + * A shuffle metrics reporter for SQL exchange operators. + * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. + * @param metrics All metrics in current SparkPlan. + */ +class SQLShuffleMetricsReporter( + tempMetrics: TempShuffleReadMetrics, + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + + override def incRemoteBlocksFetched(v: Long): Unit = { +metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v) --- End diff -- (Iâm not referring to just this function, but in general, especially for per-row). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236025817 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.executor.TempShuffleReadMetrics + +/** + * A shuffle metrics reporter for SQL exchange operators. + * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. + * @param metrics All metrics in current SparkPlan. + */ +class SQLShuffleMetricsReporter( + tempMetrics: TempShuffleReadMetrics, + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + + override def incRemoteBlocksFetched(v: Long): Unit = { +metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v) --- End diff -- Doing a hashmap lookup here could introduce serious performance regressions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org