[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236720141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,14 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
--- End diff --

Done in 0348ae5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236646423
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,14 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
--- End diff --

```
do you mean we may leave the metrics empty when creating ShuffledRowRDD in 
tests?
```
Yes, like we did in `UnsafeRowSerializerSuite`.
```
I don't think we need to consider this case since ShuffledRowRDD is a 
private API
```
Got it, after search `new ShuffledRowRDD` in all source code, 
`UnsafeRowSerializerSuite` is the only place, I'll change the test and delete 
the default value of `metrics` in this commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236637264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,14 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
--- End diff --

do you mean we may leave the `metrics` empty when creating `ShuffledRowRDD` 
in tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236636819
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,14 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
--- End diff --

I don't think we need to consider this case since `ShuffledRowRDD` is a 
private API. If we do need to consider it, we also need to take care if users 
pass in a `metrics` that is invalid.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236251210
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -82,6 +82,14 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
+  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
+  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
+  val REMOTE_BYTES_READ = "remoteBytesRead"
+  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
+  val LOCAL_BYTES_READ = "localBytesRead"
+  val FETCH_WAIT_TIME = "fetchWaitTime"
+  val RECORDS_READ = "recordsRead"
--- End diff --

Thanks for your advise Wenchen, I tried `sync this list with 
ShuffleReadMetrics` locally and left these comments below:
1. It's easy to sync SQLMetrics with `ShuffleReadMetrics` while the task 
has only one shuffle reader, just call `ShuffleMetricsReporter.incXXX` to 
achieve this.
2. But for multi shuffle reader in single task, we need add `setXXX` 
functions in `ShuffleMetricsReporter` trait, cause we need reset the SQLMetrics 
after `setMergeValues` called by every shuffle reader.
3. I also tried to achieve this but not change the 
`ShuffleMetricsReporter`, like call the `ShuffleMetricsReporter.incXXX` at 
driver side when taskEnd, but maybe this is not a good way.

If you think it's make sense to change the `ShuffleMetricsReporter` trait, 
I'll give a commit soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236108610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -82,6 +82,14 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
+  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
+  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
+  val REMOTE_BYTES_READ = "remoteBytesRead"
+  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
+  val LOCAL_BYTES_READ = "localBytesRead"
+  val FETCH_WAIT_TIME = "fetchWaitTime"
+  val RECORDS_READ = "recordsRead"
--- End diff --

Is there an easy way to sync this list with `ShuffleReadMetrics` instead of 
doing it manually?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-23 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236032855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.executor.TempShuffleReadMetrics
+
+/**
+ * A shuffle metrics reporter for SQL exchange operators.
+ * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
+ * @param metrics All metrics in current SparkPlan.
+ */
+class SQLShuffleMetricsReporter(
+  tempMetrics: TempShuffleReadMetrics,
+  metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
+
+  override def incRemoteBlocksFetched(v: Long): Unit = {
+metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v)
--- End diff --

Sorry for the less consideration on per-row operation here, I should be 
more careful. Fix done in cb46bfe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236025838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.executor.TempShuffleReadMetrics
+
+/**
+ * A shuffle metrics reporter for SQL exchange operators.
+ * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
+ * @param metrics All metrics in current SparkPlan.
+ */
+class SQLShuffleMetricsReporter(
+  tempMetrics: TempShuffleReadMetrics,
+  metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
+
+  override def incRemoteBlocksFetched(v: Long): Unit = {
+metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v)
--- End diff --

(I’m not referring to just this function, but in general, especially for 
per-row).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236025817
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.executor.TempShuffleReadMetrics
+
+/**
+ * A shuffle metrics reporter for SQL exchange operators.
+ * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
+ * @param metrics All metrics in current SparkPlan.
+ */
+class SQLShuffleMetricsReporter(
+  tempMetrics: TempShuffleReadMetrics,
+  metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
+
+  override def incRemoteBlocksFetched(v: Long): Unit = {
+metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v)
--- End diff --

Doing a hashmap lookup here could introduce serious performance regressions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org