[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23207 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239995006 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private lazy val writeMetrics = +SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = +SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) --- End diff -- Yea that was done and revert in https://github.com/apache/spark/pull/23207/commits/7d104ebe854effb3d8ceb63cd408b9749cee1a8a, will separate to another pr after this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239990986 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private lazy val writeMetrics = +SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = +SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) --- End diff -- I feel it is better to rename SQLShuffleMetricsReporter to SQLShuffleReadMetricsReporter to make it match with SQLShuffleWriteMetricsReporter. It can be in a followup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239748512 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a --- End diff -- Ah, I think I know your meaning, yea, after we passing context, more things can be done in this interface, I'll delete this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239744840 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { --- End diff -- Yea, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239744767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- Thanks, there's no back and forth, thanks for your advise and help all along Wenchen. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239743452 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a --- End diff -- Not stale, maybe I didn't express clearly, here I want to express is we always return a proxy reporter like currently SQLShuffleWriteReporter, it's not only update self metrics(local accumulator) but also the exists reporter passing in(like metrics in context). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239736660 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- `private val NS_TIMING_METRIC = "nsTiming"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239735814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- Actually I think your previous naming is good, sorry for the back and forth. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239735425 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- maybe `nsToMsTiming`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239735015 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { --- End diff -- this can be protected? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239734920 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a --- End diff -- `always return a proxy reporter for both local accumulator and original reporter updating` is it stale? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239698500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NS_TIMING_METRIC = "nanosecond" --- End diff -- How about naming it as `NORMALIZE_TIMING_METRIC`, maybe it can be reused later for other timing metric which need normalize unit. If you think its strange name I'll change back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239698273 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -333,8 +343,19 @@ object ShuffleExchangeExec { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), -serializer) +serializer, +shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency } + + /** + * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter + * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. + */ + def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { +(reporter: ShuffleWriteMetricsReporter) => { --- End diff -- Yes it can't work with Scala 2.11, should write in more readable, done in 6378a3d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239698174 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter --- End diff -- Copy, the trait can be added when we need more customization for SQL shuffle. Done in 6378a3d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677846 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NS_TIMING_METRIC = "nanosecond" --- End diff -- Can we change it to `ms`? The core side can still be `ns`, but in SQL side we truncate it into `ms`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677653 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -333,8 +343,19 @@ object ShuffleExchangeExec { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), -serializer) +serializer, +shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency } + + /** + * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter + * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. + */ + def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { +(reporter: ShuffleWriteMetricsReporter) => { --- End diff -- does this work with Scala 2.11? maybe we don't need to be that fancy and just write ``` new ShuffleWriteProcessor { xxx } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677477 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter --- End diff -- after it, we can just make `ShuffleWriteProcessor` a class --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677325 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter --- End diff -- how about `def createMetricsReporter(context: TaskContext)`? Then in core it's implemented as ``` context.taskMetrics().shuffleWriteMetrics ``` and in SQL ``` new SQLShuffle.Reporter(context.taskMetrics().shuffleWriteMetrics) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239548704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) --- End diff -- Reimplement done in a780b70. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239312090 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( +"shuffle records written" -> 2L, +"records read" -> 2L, +"local blocks fetched" -> 2L, --- End diff -- Copy, the display text will be done in another pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239311564 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) --- End diff -- As our discussion here https://github.com/apache/spark/pull/23207#discussion_r238909822, The latest approach choose to carry a function of (reporter => reporter) in shuffle dependency to create SQLShuffleWriteMetrics in ShuffleMapTask. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239311141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) --- End diff -- Both should be private lazy val(also newly added readMetrics), I'll change them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239311018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + override lazy val metrics = --- End diff -- Thanks, make sense, I'll change to separate both read/write metrics and pass them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239308829 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( +"shuffle records written" -> 2L, +"records read" -> 2L, +"local blocks fetched" -> 2L, --- End diff -- yea i'd just change the display text here, and not change the api --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239308706 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- yea i think we can just report ms level granularity. no point reporting ns (although we might want to measure based on ns) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239308197 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) --- End diff -- why are there two parameter list here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239308082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) --- End diff -- why is metrics lazy val and this one val? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239308007 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + override lazy val metrics = --- End diff -- this is somewhat confusing. I'd create a variable for the read metrics so you can pass just that into the ShuffledRDD. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239090244 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- cc @rxin , do you think we should change this metric to use ms as well? In all the places that read/write it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239069014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- Just this shuffle write time in this PR. The left one of time metrics is `fetch wait time`, it's in ms set in `ShuffleBlockFetcherIterator`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239067552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NS_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 1000 / 1000) --- End diff -- Maybe it's ok, as I test this locally with UT in SQLMetricsSuites, result below: ``` shuffle records written: 2 shuffle write time total (min, med, max): 37 ms (37 ms, 37 ms, 37 ms) shuffle bytes written total (min, med, max): 66.0 B (66.0 B, 66.0 B, 66.0 ``` In the actual scenario the shuffle bytes written will be more larger, and keep the time to ms maybe enough, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239060606 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- do we have other time metrics using nanoseconds? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239059162 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NS_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 1000 / 1000) --- End diff -- will this string lose the nanosecond precision? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239054315 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is - // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0) + // Project(nodeId = 0) + // +- ShuffledHashJoin(nodeId = 1) + // :- Exchange(nodeId = 2) + // : +- Project(nodeId = 3) + // : +- LocalTableScan(nodeId = 4) + // +- Exchange(nodeId = 5) + // +- Project(nodeId = 6) + // +- LocalTableScan(nodeId = 7) val df = df1.join(df2, "key") testSparkPlanMetrics(df, 1, Map( 1L -> (("ShuffledHashJoin", Map( "number of output rows" -> 2L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)" + "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))), +2L -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), --- End diff -- For most scenario the answer is yes, but like sort merge join cases, 2 sort node reuse same child will make shuffle records written/records read different, I also add cases in here: https://github.com/xuanyuanking/spark/blob/SPARK-26193/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala#L217-L222 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239050549 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( +"shuffle records written" -> 2L, +"records read" -> 2L, +"local blocks fetched" -> 2L, --- End diff -- I agree "fetch" is a more code name in `ShuffleBlockFetcherIterator`, but do you mean just change the display in ui? Cause there's many place even api.scala use the name `localBlocksFetched`, change them all maybe not a good choice for code backport, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239049398 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NANO_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 10) --- End diff -- Sorry...Sorry for this, change it to `1000 / 1000` as other place do for safety. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239049121 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +78,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NANO_TIMING_METRIC = "nanosecond" --- End diff -- Done in cf35b9f. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239049030 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( +reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter { + override private[spark] def incBytesWritten(v: Long): Unit = { +reporters.foreach(_.incBytesWritten(v)) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +reporters.foreach(_.decRecordsWritten(v)) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +reporters.foreach(_.incRecordsWritten(v)) + } + override private[spark] def incWriteTime(v: Long): Unit = { +reporters.foreach(_.incWriteTime(v)) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +reporters.foreach(_.decBytesWritten(v)) + } +} + + +/** + * A proxy class of ShuffleReadMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleReadMetricsReporter( --- End diff -- Got it, thanks for your guidance, revert to old approach and just little changes for `SQLShuffleReadMetricsReporter` which followed https://github.com/apache/spark/pull/23147. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239048356 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( --- End diff -- Thanks for your guidance Reynold and Wenchen, I choose the second implementation, it takes account of both less heavy option and similar use patten as `SQLShuffleReadMetricsReporter`. Done in cf35b9f. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238909822 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( --- End diff -- For the write metrics, it's different. It's the default one calls the SQL one, which needs to hack the default one to register external reporters. Maybe we should not change the read side, just create a special `PairShuffleWriteMetricsReporter` to update both the SQL reporter and default reporter. Another idea is, `ShuffleDependency` carries a `reporter => reporter` function, instead of a reporter. Then we can create a SQL reporter which takes another reporter(similar to read side), and put the SQL reporter's constructor in `ShuffleDependency`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238845399 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is - // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0) + // Project(nodeId = 0) + // +- ShuffledHashJoin(nodeId = 1) + // :- Exchange(nodeId = 2) + // : +- Project(nodeId = 3) + // : +- LocalTableScan(nodeId = 4) + // +- Exchange(nodeId = 5) + // +- Project(nodeId = 6) + // +- LocalTableScan(nodeId = 7) val df = df1.join(df2, "key") testSparkPlanMetrics(df, 1, Map( 1L -> (("ShuffledHashJoin", Map( "number of output rows" -> 2L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)" + "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))), +2L -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), --- End diff -- is this always going to be the same as "shuffle records written" ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238845029 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( +"shuffle records written" -> 2L, +"records read" -> 2L, +"local blocks fetched" -> 2L, --- End diff -- i think we should be consistent and name these "read", rather than "fetch". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238843017 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NANO_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 10) --- End diff -- is this the right conversion from nanosecs to millisecs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238842276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +78,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NANO_TIMING_METRIC = "nanosecond" --- End diff -- ns --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238837000 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( +reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter { + override private[spark] def incBytesWritten(v: Long): Unit = { +reporters.foreach(_.incBytesWritten(v)) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +reporters.foreach(_.decRecordsWritten(v)) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +reporters.foreach(_.incRecordsWritten(v)) + } + override private[spark] def incWriteTime(v: Long): Unit = { +reporters.foreach(_.incWriteTime(v)) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +reporters.foreach(_.decBytesWritten(v)) + } +} + + +/** + * A proxy class of ShuffleReadMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleReadMetricsReporter( --- End diff -- Again - I think your old approach is much better. No point creating a general util when there is only one implementation without any known future needs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238836448 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( --- End diff -- I'd not create a general API here. Just put one in SQL similar to the read side that also calls the default one. It can be expensive to go through a seq for each record and bytes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238732441 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- Cool! That's a more cleaner implementation on consistency for both read and write metrics reporter, also read metrics can extend `ShuffleReadMetricsReporter` directly. Done in ca6c407 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238633725 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- a simpler idea: 1. create a `class GroupedShuffleWriteMetricsReporter(reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter`, which proxy all the metrics updating to the input reporters. 2. create a `GroupedShuffleWriteMetricsReporter` instance here: `new GroupedShuffleWriteMetricsReporter(Seq(dep.shuffleWriteMetricsReporter.get, context.taskMetrics().shuffleWriteMetrics))`, and pass it to `manager.getWriter` I think we can use the same approach for read metrics as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238630996 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can be option instead of array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238630981 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can be option instead of array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/23207 [SPARK-26193][SQL] Implement shuffle write metrics in SQL ## What changes were proposed in this pull request? 1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the customized `ShuffleWriteMetricsReporter`. 2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle dependency. 3. Expand current `ShuffleWriteMetrics` in context as a proxy, register the shuffle write metrics reporter to it during ShuffleMapTask is created on executor. ## How was this patch tested? Add UT in SQLMetricsSuite. Manually test locally, update screen shot to document attached in JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-26193 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23207 commit 6b26c629439973045da77f7bcd4b852afe8ebd8b Author: Yuanjian Li Date: 2018-12-02T12:19:55Z Commit for fist time success commit a8a1225837419c99a3d9941046a2ca6b501f6dc8 Author: Yuanjian Li Date: 2018-12-03T12:06:34Z Simplify implement by add metrics in ShuffleExchangeExec commit 7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb Author: Yuanjian Li Date: 2018-12-03T12:40:41Z code clean and comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org