[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23207


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239995006
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
--- End diff --

Yea that was done and revert in 
https://github.com/apache/spark/pull/23207/commits/7d104ebe854effb3d8ceb63cd408b9749cee1a8a,
 will separate to another pr after this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239990986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
--- End diff --

I feel it is better to rename SQLShuffleMetricsReporter to 
SQLShuffleReadMetricsReporter to make it match with 
SQLShuffleWriteMetricsReporter. It can be in a followup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239748512
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
--- End diff --

Ah, I think I know your meaning, yea, after we passing context, more things 
can be done in this interface, I'll delete this comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239744840
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(context: TaskContext): 
ShuffleWriteMetricsReporter = {
--- End diff --

Yea, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239744767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

Thanks, there's no back and forth, thanks for your advise and help all 
along Wenchen. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239743452
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
--- End diff --

Not stale, maybe I didn't express clearly, here I want to express is we 
always return a proxy reporter like currently SQLShuffleWriteReporter, it's not 
only update self metrics(local accumulator) but also the exists reporter 
passing in(like metrics in context).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239736660
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

`private val NS_TIMING_METRIC = "nsTiming"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239735814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

Actually I think your previous naming is good, sorry for the back and forth.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239735425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

maybe `nsToMsTiming`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239735015
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(context: TaskContext): 
ShuffleWriteMetricsReporter = {
--- End diff --

this can be protected?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239734920
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
--- End diff --

`always return a proxy reporter for both local accumulator and original 
reporter updating`

is it stale?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239698500
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NS_TIMING_METRIC = "nanosecond"
--- End diff --

How about naming it as `NORMALIZE_TIMING_METRIC`, maybe it can be reused 
later for other timing metric which need normalize unit. If you think its 
strange name I'll change back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239698273
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -333,8 +343,19 @@ object ShuffleExchangeExec {
   new ShuffleDependency[Int, InternalRow, InternalRow](
 rddWithPartitionIds,
 new PartitionIdPassthrough(part.numPartitions),
-serializer)
+serializer,
+shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
 
 dependency
   }
+
+  /**
+   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the 
default metrics reporter
+   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for 
[[ShuffleWriteProcessor]].
+   */
+  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): 
ShuffleWriteProcessor = {
+(reporter: ShuffleWriteMetricsReporter) => {
--- End diff --

Yes it can't work with Scala 2.11, should write in more readable, done in 
6378a3d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239698174
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] trait ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): 
ShuffleWriteMetricsReporter
--- End diff --

Copy, the trait can be added when we need more customization for SQL 
shuffle. Done in 6378a3d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677846
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NS_TIMING_METRIC = "nanosecond"
--- End diff --

Can we change it to `ms`? The core side can still be `ns`, but in SQL side 
we truncate it into `ms`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677653
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -333,8 +343,19 @@ object ShuffleExchangeExec {
   new ShuffleDependency[Int, InternalRow, InternalRow](
 rddWithPartitionIds,
 new PartitionIdPassthrough(part.numPartitions),
-serializer)
+serializer,
+shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
 
 dependency
   }
+
+  /**
+   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the 
default metrics reporter
+   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for 
[[ShuffleWriteProcessor]].
+   */
+  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): 
ShuffleWriteProcessor = {
+(reporter: ShuffleWriteMetricsReporter) => {
--- End diff --

does this work with Scala 2.11? maybe we don't need to be that fancy and 
just write
```
new ShuffleWriteProcessor {
  xxx
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677477
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] trait ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): 
ShuffleWriteMetricsReporter
--- End diff --

after it, we can just make `ShuffleWriteProcessor` a class


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677325
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] trait ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): 
ShuffleWriteMetricsReporter
--- End diff --

how about `def createMetricsReporter(context: TaskContext)`?

Then in core it's implemented as
```
context.taskMetrics().shuffleWriteMetrics
```

and in SQL
```
new SQLShuffle.Reporter(context.taskMetrics().shuffleWriteMetrics)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239548704
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
--- End diff --

Reimplement done in a780b70.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239312090
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
 testSparkPlanMetrics(df, 1, Map(
   2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
+  1L -> (("Exchange", Map(
+"shuffle records written" -> 2L,
+"records read" -> 2L,
+"local blocks fetched" -> 2L,
--- End diff --

Copy, the display text will be done in another pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239311564
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
--- End diff --

As our discussion here 
https://github.com/apache/spark/pull/23207#discussion_r238909822, The latest 
approach choose to carry a function of (reporter => reporter)  in shuffle 
dependency to create SQLShuffleWriteMetrics in ShuffleMapTask.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239311141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private val writeMetrics = 
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
--- End diff --

Both should be private lazy val(also newly added readMetrics), I'll change 
them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239311018
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private val writeMetrics = 
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  override lazy val metrics =
--- End diff --

Thanks, make sense, I'll change to separate both read/write metrics and 
pass them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239308829
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
 testSparkPlanMetrics(df, 1, Map(
   2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
+  1L -> (("Exchange", Map(
+"shuffle records written" -> 2L,
+"records read" -> 2L,
+"local blocks fetched" -> 2L,
--- End diff --

yea i'd just change the display text here, and not change the api


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239308706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

yea i think we can just report ms level granularity. no point reporting ns 
(although we might want to measure based on ns)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239308197
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
--- End diff --

why are there two parameter list here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239308082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private val writeMetrics = 
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
--- End diff --

why is metrics lazy val and this one val?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239308007
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private val writeMetrics = 
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  override lazy val metrics =
--- End diff --

this is somewhat confusing. I'd create a variable for the read metrics so 
you can pass just that into the ShuffledRDD.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239090244
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

cc @rxin , do you think we should change this metric to use ms as well? In 
all the places that read/write it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239069014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

Just this shuffle write time in this PR. The left one of time metrics is 
`fetch wait time`, it's in ms set in `ShuffleBlockFetcherIterator`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239067552
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NS_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 1000 / 1000)
--- End diff --

Maybe it's ok, as I test this locally with UT in SQLMetricsSuites, result 
below:
```
shuffle records written: 2
shuffle write time total (min, med, max): 37 ms (37 ms, 37 ms, 37 ms)
shuffle bytes written total (min, med, max): 66.0 B (66.0 B, 66.0 B, 66.0 
```
In the actual scenario the shuffle bytes written will be more larger, and 
keep the time to ms maybe enough, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239060606
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

do we have other time metrics using nanoseconds?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239059162
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NS_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 1000 / 1000)
--- End diff --

will this string lose the nanosecond precision?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239054315
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
   val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", 
"value")
   // Assume the execution plan is
-  // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
+  // Project(nodeId = 0)
+  // +- ShuffledHashJoin(nodeId = 1)
+  // :- Exchange(nodeId = 2)
+  // :  +- Project(nodeId = 3)
+  // : +- LocalTableScan(nodeId = 4)
+  // +- Exchange(nodeId = 5)
+  // +- Project(nodeId = 6)
+  // +- LocalTableScan(nodeId = 7)
   val df = df1.join(df2, "key")
   testSparkPlanMetrics(df, 1, Map(
 1L -> (("ShuffledHashJoin", Map(
   "number of output rows" -> 2L,
-  "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"
+  "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
+2L -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
--- End diff --

For most scenario the answer is yes, but like sort merge join cases, 2 sort 
node reuse same child will make shuffle records written/records read different, 
I also add cases in here:


https://github.com/xuanyuanking/spark/blob/SPARK-26193/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala#L217-L222


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239050549
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
 testSparkPlanMetrics(df, 1, Map(
   2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
+  1L -> (("Exchange", Map(
+"shuffle records written" -> 2L,
+"records read" -> 2L,
+"local blocks fetched" -> 2L,
--- End diff --

I agree "fetch" is a more code name in `ShuffleBlockFetcherIterator`, but 
do you mean just change the display in ui? Cause there's many place even 
api.scala use the name `localBlocksFetched`, change them all maybe not a good 
choice for code backport, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239049398
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NANO_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 10)
--- End diff --

Sorry...Sorry for this, change it to `1000 / 1000` as other place do for 
safety.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239049121
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +78,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NANO_TIMING_METRIC = "nanosecond"
--- End diff --

Done in cf35b9f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239049030
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
+reporters: Seq[ShuffleWriteMetricsReporter]) extends 
ShuffleWriteMetricsReporter {
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+reporters.foreach(_.incBytesWritten(v))
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+reporters.foreach(_.decRecordsWritten(v))
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+reporters.foreach(_.incRecordsWritten(v))
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+reporters.foreach(_.incWriteTime(v))
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+reporters.foreach(_.decBytesWritten(v))
+  }
+}
+
+
+/**
+ * A proxy class of ShuffleReadMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleReadMetricsReporter(
--- End diff --

Got it, thanks for your guidance, revert to old approach and just little 
changes for `SQLShuffleReadMetricsReporter` which followed 
https://github.com/apache/spark/pull/23147.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239048356
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
--- End diff --

Thanks for your guidance Reynold and Wenchen, I choose the second 
implementation, it takes account of both less heavy option and similar use 
patten as `SQLShuffleReadMetricsReporter`. Done in cf35b9f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238909822
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
--- End diff --

For the write metrics, it's different. It's the default one calls the SQL 
one, which needs to hack the default one to register external reporters.

Maybe we should not change the read side, just create a special 
`PairShuffleWriteMetricsReporter` to update both the SQL reporter and default 
reporter.

Another idea is, `ShuffleDependency` carries a `reporter => reporter` 
function, instead of a reporter. Then we can create a SQL reporter which takes 
another reporter(similar to read side), and put the SQL reporter's constructor 
in `ShuffleDependency`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238845399
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
   val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", 
"value")
   // Assume the execution plan is
-  // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
+  // Project(nodeId = 0)
+  // +- ShuffledHashJoin(nodeId = 1)
+  // :- Exchange(nodeId = 2)
+  // :  +- Project(nodeId = 3)
+  // : +- LocalTableScan(nodeId = 4)
+  // +- Exchange(nodeId = 5)
+  // +- Project(nodeId = 6)
+  // +- LocalTableScan(nodeId = 7)
   val df = df1.join(df2, "key")
   testSparkPlanMetrics(df, 1, Map(
 1L -> (("ShuffledHashJoin", Map(
   "number of output rows" -> 2L,
-  "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"
+  "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
+2L -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
--- End diff --

is this always going to be the same as "shuffle records written" ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238845029
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
 testSparkPlanMetrics(df, 1, Map(
   2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
+  1L -> (("Exchange", Map(
+"shuffle records written" -> 2L,
+"records read" -> 2L,
+"local blocks fetched" -> 2L,
--- End diff --

i think we should be consistent and name these "read", rather than "fetch".



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238843017
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NANO_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 10)
--- End diff --

is this the right conversion from nanosecs to millisecs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238842276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +78,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NANO_TIMING_METRIC = "nanosecond"
--- End diff --

ns


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238837000
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
+reporters: Seq[ShuffleWriteMetricsReporter]) extends 
ShuffleWriteMetricsReporter {
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+reporters.foreach(_.incBytesWritten(v))
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+reporters.foreach(_.decRecordsWritten(v))
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+reporters.foreach(_.incRecordsWritten(v))
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+reporters.foreach(_.incWriteTime(v))
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+reporters.foreach(_.decBytesWritten(v))
+  }
+}
+
+
+/**
+ * A proxy class of ShuffleReadMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleReadMetricsReporter(
--- End diff --

Again - I think your old approach is much better. No point creating a 
general util when there is only one implementation without any known future 
needs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238836448
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
--- End diff --

I'd not create a general API here. Just put one in SQL similar to the read 
side that also calls the default one.

It can be expensive to go through a seq for each record and bytes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238732441
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

Cool! That's a more cleaner implementation on consistency for both read and 
write metrics reporter, also read metrics can extend 
`ShuffleReadMetricsReporter` directly. Done in ca6c407


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238633725
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

a simpler idea:
1. create a `class GroupedShuffleWriteMetricsReporter(reporters: 
Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter`, which 
proxy all the metrics updating to the input reporters.
2. create a `GroupedShuffleWriteMetricsReporter` instance here: `new 
GroupedShuffleWriteMetricsReporter(Seq(dep.shuffleWriteMetricsReporter.get, 
context.taskMetrics().shuffleWriteMetrics))`, and pass it to `manager.getWriter`

I think we can use the same approach for read metrics as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238630996
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can 
be option instead of array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238630981
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can 
be option instead of array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-03 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/23207

[SPARK-26193][SQL] Implement shuffle write metrics in SQL

## What changes were proposed in this pull request?

1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the 
customized `ShuffleWriteMetricsReporter`.
2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these 
metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle 
dependency.
3. Expand current `ShuffleWriteMetrics` in context as a proxy, register the 
shuffle write metrics reporter to it during ShuffleMapTask is created on 
executor.

## How was this patch tested?
Add UT in SQLMetricsSuite.
Manually test locally, update screen shot to document attached in JIRA.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-26193

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23207


commit 6b26c629439973045da77f7bcd4b852afe8ebd8b
Author: Yuanjian Li 
Date:   2018-12-02T12:19:55Z

Commit for fist time success

commit a8a1225837419c99a3d9941046a2ca6b501f6dc8
Author: Yuanjian Li 
Date:   2018-12-03T12:06:34Z

Simplify implement by add metrics in ShuffleExchangeExec

commit 7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb
Author: Yuanjian Li 
Date:   2018-12-03T12:40:41Z

code clean and comments




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org