[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239995006
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
--- End diff --

Yea that was done and revert in 
https://github.com/apache/spark/pull/23207/commits/7d104ebe854effb3d8ceb63cd408b9749cee1a8a,
 will separate to another pr after this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239748512
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
--- End diff --

Ah, I think I know your meaning, yea, after we passing context, more things 
can be done in this interface, I'll delete this comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239744840
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(context: TaskContext): 
ShuffleWriteMetricsReporter = {
--- End diff --

Yea, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239744767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

Thanks, there's no back and forth, thanks for your advise and help all 
along Wenchen. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239743452
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
--- End diff --

Not stale, maybe I didn't express clearly, here I want to express is we 
always return a proxy reporter like currently SQLShuffleWriteReporter, it's not 
only update self metrics(local accumulator) but also the exists reporter 
passing in(like metrics in context).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
```
the code looks much cleaner now!
```
Sorry for the original rush and code, I should and will pay more attention 
on coding clean and more discussion on optional implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239698500
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NS_TIMING_METRIC = "nanosecond"
--- End diff --

How about naming it as `NORMALIZE_TIMING_METRIC`, maybe it can be reused 
later for other timing metric which need normalize unit. If you think its 
strange name I'll change back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239698273
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -333,8 +343,19 @@ object ShuffleExchangeExec {
   new ShuffleDependency[Int, InternalRow, InternalRow](
 rddWithPartitionIds,
 new PartitionIdPassthrough(part.numPartitions),
-serializer)
+serializer,
+shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
 
 dependency
   }
+
+  /**
+   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the 
default metrics reporter
+   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for 
[[ShuffleWriteProcessor]].
+   */
+  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): 
ShuffleWriteProcessor = {
+(reporter: ShuffleWriteMetricsReporter) => {
--- End diff --

Yes it can't work with Scala 2.11, should write in more readable, done in 
6378a3d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239698174
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] trait ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): 
ShuffleWriteMetricsReporter
--- End diff --

Copy, the trait can be added when we need more customization for SQL 
shuffle. Done in 6378a3d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
```
Can we put the above in a closure and pass it into shuffle dependency? Then 
in SQL we just put the above in SQL using custom metrics.
```
Yea, the commit of a780b70 achieve this by adding `ShuffleWriteProcessor` 
abstract.
And the read metrics rename reverted in 7d104eb, will do it and display 
change in another pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239548704
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
--- End diff --

Reimplement done in a780b70.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239312090
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
 testSparkPlanMetrics(df, 1, Map(
   2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
+  1L -> (("Exchange", Map(
+"shuffle records written" -> 2L,
+"records read" -> 2L,
+"local blocks fetched" -> 2L,
--- End diff --

Copy, the display text will be done in another pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
```
can you separate the prs to rename read side metric and the write side 
change?
```
No problem, next commit will revert the changes of rename read side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239311564
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
--- End diff --

As our discussion here 
https://github.com/apache/spark/pull/23207#discussion_r238909822, The latest 
approach choose to carry a function of (reporter => reporter)  in shuffle 
dependency to create SQLShuffleWriteMetrics in ShuffleMapTask.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239311141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private val writeMetrics = 
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
--- End diff --

Both should be private lazy val(also newly added readMetrics), I'll change 
them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239311018
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,12 +38,18 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private val writeMetrics = 
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  override lazy val metrics =
--- End diff --

Thanks, make sense, I'll change to separate both read/write metrics and 
pass them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239069014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter {
 FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait 
time"),
 RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
 }
+
+/**
+ * A shuffle write metrics reporter for SQL exchange operators. Different 
with
+ * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => 
reporter) set in
+ * shuffle dependency, so the local SQLMetric should transient and create 
on executor.
+ * @param metrics Shuffle write metrics in current SparkPlan.
+ * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
+ */
+private[spark] case class SQLShuffleWriteMetricsReporter(
+metrics: Map[String, SQLMetric])(metricsReporter: 
ShuffleWriteMetricsReporter)
+  extends ShuffleWriteMetricsReporter with Serializable {
+  @transient private[this] lazy val _bytesWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN)
+  @transient private[this] lazy val _recordsWritten =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN)
+  @transient private[this] lazy val _writeTime =
+metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
+
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+metricsReporter.incBytesWritten(v)
+_bytesWritten.add(v)
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_recordsWritten.set(_recordsWritten.value - v)
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+metricsReporter.incRecordsWritten(v)
+_recordsWritten.add(v)
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+metricsReporter.incWriteTime(v)
+_writeTime.add(v)
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+metricsReporter.decBytesWritten(v)
+_bytesWritten.set(_bytesWritten.value - v)
+  }
+}
+
+private[spark] object SQLShuffleWriteMetricsReporter {
+  val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
+  val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
+  val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
--- End diff --

Just this shuffle write time in this PR. The left one of time metrics is 
`fetch wait time`, it's in ms set in `ShuffleBlockFetcherIterator`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239067552
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NS_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 1000 / 1000)
--- End diff --

Maybe it's ok, as I test this locally with UT in SQLMetricsSuites, result 
below:
```
shuffle records written: 2
shuffle write time total (min, med, max): 37 ms (37 ms, 37 ms, 37 ms)
shuffle bytes written total (min, med, max): 66.0 B (66.0 B, 66.0 B, 66.0 
```
In the actual scenario the shuffle bytes written will be more larger, and 
keep the time to ms maybe enough, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239054315
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -299,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
   val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", 
"value")
   // Assume the execution plan is
-  // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
+  // Project(nodeId = 0)
+  // +- ShuffledHashJoin(nodeId = 1)
+  // :- Exchange(nodeId = 2)
+  // :  +- Project(nodeId = 3)
+  // : +- LocalTableScan(nodeId = 4)
+  // +- Exchange(nodeId = 5)
+  // +- Project(nodeId = 6)
+  // +- LocalTableScan(nodeId = 7)
   val df = df1.join(df2, "key")
   testSparkPlanMetrics(df, 1, Map(
 1L -> (("ShuffledHashJoin", Map(
   "number of output rows" -> 2L,
-  "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"
+  "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
+2L -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
--- End diff --

For most scenario the answer is yes, but like sort merge join cases, 2 sort 
node reuse same child will make shuffle records written/records read different, 
I also add cases in here:


https://github.com/xuanyuanking/spark/blob/SPARK-26193/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala#L217-L222


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239050549
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -170,13 +172,23 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions
 testSparkPlanMetrics(df, 1, Map(
   2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))),
+  1L -> (("Exchange", Map(
+"shuffle records written" -> 2L,
+"records read" -> 2L,
+"local blocks fetched" -> 2L,
--- End diff --

I agree "fetch" is a more code name in `ShuffleBlockFetcherIterator`, but 
do you mean just change the display in ui? Cause there's many place even 
api.scala use the name `localBlocksFetched`, change them all maybe not a good 
choice for code backport, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239049398
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -163,6 +171,8 @@ object SQLMetrics {
 Utils.bytesToString
   } else if (metricsType == TIMING_METRIC) {
 Utils.msDurationToString
+  } else if (metricsType == NANO_TIMING_METRIC) {
+duration => Utils.msDurationToString(duration / 10)
--- End diff --

Sorry...Sorry for this, change it to `1000 / 1000` as other place do for 
safety.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239049121
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +78,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NANO_TIMING_METRIC = "nanosecond"
--- End diff --

Done in cf35b9f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239049030
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
+reporters: Seq[ShuffleWriteMetricsReporter]) extends 
ShuffleWriteMetricsReporter {
+  override private[spark] def incBytesWritten(v: Long): Unit = {
+reporters.foreach(_.incBytesWritten(v))
+  }
+  override private[spark] def decRecordsWritten(v: Long): Unit = {
+reporters.foreach(_.decRecordsWritten(v))
+  }
+  override private[spark] def incRecordsWritten(v: Long): Unit = {
+reporters.foreach(_.incRecordsWritten(v))
+  }
+  override private[spark] def incWriteTime(v: Long): Unit = {
+reporters.foreach(_.incWriteTime(v))
+  }
+  override private[spark] def decBytesWritten(v: Long): Unit = {
+reporters.foreach(_.decBytesWritten(v))
+  }
+}
+
+
+/**
+ * A proxy class of ShuffleReadMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleReadMetricsReporter(
--- End diff --

Got it, thanks for your guidance, revert to old approach and just little 
changes for `SQLShuffleReadMetricsReporter` which followed 
https://github.com/apache/spark/pull/23147.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239048356
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter {
   private[spark] def decBytesWritten(v: Long): Unit
   private[spark] def decRecordsWritten(v: Long): Unit
 }
+
+
+/**
+ * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics 
updating to the input
+ * reporters.
+ */
+private[spark] class GroupedShuffleWriteMetricsReporter(
--- End diff --

Thanks for your guidance Reynold and Wenchen, I choose the second 
implementation, it takes account of both less heavy option and similar use 
patten as `SQLShuffleReadMetricsReporter`. Done in cf35b9f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r238732441
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask(
   threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
 } else 0L
 
+// Register the shuffle write metrics reporter to shuffleWriteMetrics.
+if (dep.shuffleWriteMetricsReporter.isDefined) {
+  
context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter(
--- End diff --

Cool! That's a more cleaner implementation on consistency for both read and 
write metrics reporter, also read metrics can extend 
`ShuffleReadMetricsReporter` directly. Done in ca6c407


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-04 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
Thanks for your reply Wenchen, there's a sketch doc assigned in 
JIRA:https://docs.google.com/document/d/1DX0gLkpk_NCE5MwI1_m4gnA2rLdjDkynZ02u2VWDR-8/edit

```
IMO shuffle write metrics is hard, as an RDD can have shuffle dependencies 
with multiple upstream RDDs. That said, in general the shuffle write metrics 
should belong to the upstream RDDs.
```
That's right and that's also what I try to do at first, logically upstream 
operator trigger shuffle write, and first attempt implementation is also 
changed SparkPlan base class to achieve this.

```
In Spark SQL, it's a little simpler, as the ShuffledRowRDD always have only 
one child, so it's reasonable to say that shuffle write metrics belong to 
ShuffledRowRDD.
```
Yes, maybe this also the suggestion by Reynold, ShuffleExchangeExec has 
only one child, we can do a simplify on the implementation. But as the shuffle 
write metrics are updated by task inner, so the core module still need some 
changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
cc @cloud-fan @gatorsmile @rxin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
@SparkQA test this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
test this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
@SparkQA retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
@AmplabJenkins retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
@AmplabJenkins test this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
@AmplabJenkins


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23207
  
test this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-03 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/23207

[SPARK-26193][SQL] Implement shuffle write metrics in SQL

## What changes were proposed in this pull request?

1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the 
customized `ShuffleWriteMetricsReporter`.
2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these 
metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle 
dependency.
3. Expand current `ShuffleWriteMetrics` in context as a proxy, register the 
shuffle write metrics reporter to it during ShuffleMapTask is created on 
executor.

## How was this patch tested?
Add UT in SQLMetricsSuite.
Manually test locally, update screen shot to document attached in JIRA.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-26193

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23207


commit 6b26c629439973045da77f7bcd4b852afe8ebd8b
Author: Yuanjian Li 
Date:   2018-12-02T12:19:55Z

Commit for fist time success

commit a8a1225837419c99a3d9941046a2ca6b501f6dc8
Author: Yuanjian Li 
Date:   2018-12-03T12:06:34Z

Simplify implement by add metrics in ShuffleExchangeExec

commit 7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb
Author: Yuanjian Li 
Date:   2018-12-03T12:40:41Z

code clean and comments




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23175: [SPARK-26142]followup: Move sql shuffle read metrics rel...

2018-11-30 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23175
  
Thanks @cloud-fan @rxin.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...

2018-11-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r237346452
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.executor.TempShuffleReadMetrics
+
+/**
+ * A shuffle metrics reporter for SQL exchange operators.
+ * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
+ * @param metrics All metrics in current SparkPlan. This param should not 
empty and
+ *   contains all shuffle metrics defined in 
[[SQLMetrics.getShuffleReadMetrics]].
+ */
+private[spark] class SQLShuffleMetricsReporter(
+  tempMetrics: TempShuffleReadMetrics,
--- End diff --

Thanks ,done in https://github.com/apache/spark/pull/23175.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...

2018-11-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r237346431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -194,4 +202,16 @@ object SQLMetrics {
 SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m 
=> m.id -> m.value)))
 }
   }
+
+  /**
+   * Create all shuffle read relative metrics and return the Map.
+   */
+  def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = 
Map(
--- End diff --

Thanks, rename it to createShuffleReadMetrics and move to 
SQLShuffleMetricsReporter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23175: [SPARK-26142]followup: Move sql shuffle read metr...

2018-11-28 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/23175

[SPARK-26142]followup: Move sql shuffle read metrics relatives to 
SQLShuffleMetricsReporter

## What changes were proposed in this pull request?

Follow up for https://github.com/apache/spark/pull/23128, move sql read 
metrics relatives to `SQLShuffleMetricsReporter`, in order to put sql shuffle 
read metrics relatives closer and avoid possible problem about forgetting 
update SQLShuffleMetricsReporter while new metrics added by others.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-26142-follow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23175


commit c042d15920f4ebd7166caafcb3696683978d42de
Author: Yuanjian Li 
Date:   2018-11-29T02:47:33Z

Move all sql shuffle read metrics stuff to SQLShuffleMetricsReporter




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL

2018-11-28 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23128
  
@rxin Thanks for guidance, I'll address these comments in a follow up PR 
soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL

2018-11-28 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23128
  
Thanks @cloud-fan @gatorsmile @rxin !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...

2018-11-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236982643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,10 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// Wrap the tempMetrics with SQLShuffleMetricsReporter here to support
+// shuffle read metrics in SQL.
--- End diff --

Thanks Wenchen, done in 8e84c5b.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23128
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23128
  
python UT failed cause jvm crush.
retest this pleas.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236926403
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,10 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
+// so we just use the tempMetrics created in TaskContext in this case.
--- End diff --

Removing this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23153
  
Thanks for the fix from Wenchen, 
```
the suites should also construct the dummy python udf from both side.
```
I fix the suite locally, they can be simply modified like:
```
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
index d3867f2b6b..a0f8ae2fc7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
@@ -40,13 +40,18 @@ class PullOutPythonUDFInJoinConditionSuite extends 
PlanTest {
 CheckCartesianProducts) :: Nil
   }

-  val testRelationLeft = LocalRelation('a.int, 'b.int)
-  val testRelationRight = LocalRelation('c.int, 'd.int)
+  val attrA = 'a.int
+  val attrB = 'b.int
+  val attrC = 'c.int
+  val attrD = 'd.int
+
+  val testRelationLeft = LocalRelation(attrA, attrB)
+  val testRelationRight = LocalRelation(attrC, attrD)

   // Dummy python UDF for testing. Unable to execute.
   val pythonUDF = PythonUDF("pythonUDF", null,
 BooleanType,
-Seq.empty,
+Seq(attrA, attrC),
 PythonEvalType.SQL_BATCHED_UDF,
 udfDeterministic = true)

@@ -118,7 +123,7 @@ class PullOutPythonUDFInJoinConditionSuite extends 
PlanTest {
   test("pull out whole complex condition with multiple python udf") {
 val pythonUDF1 = PythonUDF("pythonUDF1", null,
   BooleanType,
-  Seq.empty,
+  Seq(attrA, attrC),
   PythonEvalType.SQL_BATCHED_UDF,
   udfDeterministic = true)
 val condition = (pythonUDF || 'a.attr === 'c.attr) && pythonUDF1
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23153#discussion_r236743539
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
 }
 
 /**
- * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
- * and pull them out from join condition. For python udf accessing 
attributes from only one side,
- * they are pushed down by operation push down rules. If not (e.g. user 
disables filter push
- * down rules), we need to pull them out in this rule too.
+ * PythonUDF in join condition can't be evaluated if it refers to 
attributes from both join sides.
+ * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable 
PythonUDF and pull them
+ * out from join condition.
  */
 object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
-  def hasPythonUDF(expression: Expression): Boolean = {
-expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+
+  private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean 
= {
+expr.find { e =>
+  PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && 
!canEvaluate(e, j.right)
+}.isDefined
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case j @ Join(_, _, joinType, condition)
-if condition.isDefined && hasPythonUDF(condition.get) =>
+case j @ Join(_, _, joinType, Some(cond)) if 
hasUnevaluablePythonUDF(cond, j) =>
--- End diff --

Followed by the rule changes, we need modify the suites in 
`PullOutPythonUDFInJoinConditionSuite`, the suites should also construct the 
dummy python udf from both side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23153#discussion_r236647128
  
--- Diff: python/pyspark/sql/tests/test_udf.py ---
@@ -209,6 +209,18 @@ def test_udf_in_join_condition(self):
 with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_left_outer_join_condition(self):
+# regression test for SPARK-26147
+from pyspark.sql.functions import udf, col
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a: str(a), StringType())
+# The join condition can't be pushed down, as it refers to 
attributes from both sides.
+# The Python UDF only refer to attributes from one side, so it's 
evaluable.
+df = left.join(right, f("a") == col("b").cast("string"), how = 
"left_outer")
--- End diff --

style nit: how="left_outer"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236720141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,14 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
--- End diff --

Done in 0348ae5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236646423
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---
@@ -154,7 +156,14 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// metrics here could be empty cause user can use ShuffledRowRDD 
directly,
--- End diff --

```
do you mean we may leave the metrics empty when creating ShuffledRowRDD in 
tests?
```
Yes, like we did in `UnsafeRowSerializerSuite`.
```
I don't think we need to consider this case since ShuffledRowRDD is a 
private API
```
Got it, after search `new ShuffledRowRDD` in all source code, 
`UnsafeRowSerializerSuite` is the only place, I'll change the test and delete 
the default value of `metrics` in this commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236251210
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -82,6 +82,14 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
+  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
+  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
+  val REMOTE_BYTES_READ = "remoteBytesRead"
+  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
+  val LOCAL_BYTES_READ = "localBytesRead"
+  val FETCH_WAIT_TIME = "fetchWaitTime"
+  val RECORDS_READ = "recordsRead"
--- End diff --

Thanks for your advise Wenchen, I tried `sync this list with 
ShuffleReadMetrics` locally and left these comments below:
1. It's easy to sync SQLMetrics with `ShuffleReadMetrics` while the task 
has only one shuffle reader, just call `ShuffleMetricsReporter.incXXX` to 
achieve this.
2. But for multi shuffle reader in single task, we need add `setXXX` 
functions in `ShuffleMetricsReporter` trait, cause we need reset the SQLMetrics 
after `setMergeValues` called by every shuffle reader.
3. I also tried to achieve this but not change the 
`ShuffleMetricsReporter`, like call the `ShuffleMetricsReporter.incXXX` at 
driver side when taskEnd, but maybe this is not a good way.

If you think it's make sense to change the `ShuffleMetricsReporter` trait, 
I'll give a commit soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23105: [SPARK-26140] Enable custom metrics implementatio...

2018-11-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23105#discussion_r236036258
  
--- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+/**
+ * An interface for reporting shuffle read metrics, for each shuffle. This 
interface assumes
+ * all the methods are called on a single-threaded, i.e. concrete 
implementations would not need
+ * to synchronize.
+ *
+ * All methods have additional Spark visibility modifier to allow public, 
concrete implementations
+ * that still have these methods marked as private[spark].
+ */
+private[spark] trait ShuffleReadMetricsReporter {
--- End diff --

https://github.com/apache/spark/pull/23128 :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...

2018-11-24 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23128
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...

2018-11-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/23128
  
@gatorsmile Thanks Xiao! Conflicts resolve done, as Reynold comments in 
https://github.com/apache/spark/pull/23105#discussion_r235950427, when the 
ShuffleMetricsReporter move to ShuffleReadMetricsReporter in write pr, it will 
conflict again here, I'll keep tracking the relevant pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...

2018-11-23 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/23128#discussion_r236032855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.executor.TempShuffleReadMetrics
+
+/**
+ * A shuffle metrics reporter for SQL exchange operators.
+ * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
+ * @param metrics All metrics in current SparkPlan.
+ */
+class SQLShuffleMetricsReporter(
+  tempMetrics: TempShuffleReadMetrics,
+  metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
+
+  override def incRemoteBlocksFetched(v: Long): Unit = {
+metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED).add(v)
--- End diff --

Sorry for the less consideration on per-row operation here, I should be 
more careful. Fix done in cb46bfe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23128: [SPARK-26139][SQL] Support passing shuffle metric...

2018-11-23 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/23128

[SPARK-26139][SQL] Support passing shuffle metrics to exchange operator

## What changes were proposed in this pull request?

Implement `SQLShuffleMetricsReporter` on the sql side as the customized 
ShuffleMetricsReporter, which extended the `TempShuffleReadMetrics` and update 
SQLMetrics, in this way shuffle metrics can be reported in the SQL UI.

## How was this patch tested?

Add UT in SQLMetricsSuite.
Manual test locally, before:

![image](https://user-images.githubusercontent.com/4833765/48960517-30f97880-efa8-11e8-982c-92d05938fd1d.png)
after:

![image](https://user-images.githubusercontent.com/4833765/48960587-b54bfb80-efa8-11e8-8e95-7a3c8c74cc5c.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-26142

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23128.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23128


commit fba590f040c8fd7ce75df3f733f246db18e79ee6
Author: Reynold Xin 
Date:   2018-11-21T14:56:23Z

[SPARK-26140] Pull TempShuffleReadMetrics creation out of shuffle reader

commit 35b48b21028110742aed7f7f5b5d62109c2f0adf
Author: Reynold Xin 
Date:   2018-11-21T15:02:04Z

less movement of code

commit 1b556ecf869685af8f34d448ac3f08102a758124
Author: liyuanjian 
Date:   2018-11-23T21:02:25Z

[SPARK-26142] Implement shuffle read metric in SQL




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateF...

2018-11-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21363
  
@MaxGekk Sorry for the late, something inserted in the my scheduler, I plan 
to start this PR in this weekend, if its too late please just take it, sorry 
for the late again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22989: [SPARK-25986][Build] Add rules to ban throw Errors in ap...

2018-11-14 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22989
  
Thanks @HyukjinKwon @viirya @felixcheung @srowen for your review and advise!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...

2018-11-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r233432630
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala ---
@@ -331,7 +333,7 @@ class KMeansSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   }
 }
 
-object KMeansSuite extends SparkFunSuite {
+object KMeansSuite extends SparkFunSuite with Assertions {
--- End diff --

ah thanks! Done in 210d942.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...

2018-11-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r233432568
  
--- Diff: dev/checkstyle.xml ---
@@ -180,5 +180,10 @@
 
 
 
+
+
+
--- End diff --

`application code` against JVM here, as Error subclasses generally 
represent internal JVM errors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22962
  
@HyukjinKwon No problem, I'll give a follow up PR to address all your 
comments and rewrite the UT in to a separate class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233283645
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

```
could you add some comments to explain it?
```
@cloud-fan Sorry for the less explain and more comments should be done at 
first, will done in follow up PR.
```
Can we get rid of the rewrite all?
we should remove __init__ too
next time please fully describe what's going on in PR description
```
@HyukjinKwon Sorry for the less explain, all these will be done in next 
follow up PR, and the new UT.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233275410
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

```
could you add some comments to explain it?
```
@cloud-fan Sorry for the less explain and more comments should be done at 
first, will done in follow up PR.
```
Can we get rid of the rewrite all?
we should remove __init__ too
next time please fully describe what's going on in PR description
```
@HyukjinKwon Sorry for the less explain, all these will be done in next 
follow up PR, and the new UT.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r233055939
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -147,8 +147,8 @@ def __init__(self):
 @classmethod
 def _getOrCreate(cls):
 """Internal function to get or create global BarrierTaskContext."""
-if cls._taskContext is None:
-cls._taskContext = BarrierTaskContext()
+if not isinstance(cls._taskContext, BarrierTaskContext):
+cls._taskContext = object.__new__(cls)
--- End diff --

I think the answer is no, maybe I was not clear enough in my previous 
explain  https://github.com/apache/spark/pull/22962#discussion_r232528333, use 
`BarrierTaskContext()` here is my first commit 
https://github.com/apache/spark/pull/22962/commits/0cb2cf6e9ece66861073c31b579b595a9de5ce81
 , it should also need to rewrite `__new__` for `BarrierTaskContext`, otherwise 
the bug still exists cause its parent class `TaskContext` rewrite `__new__()`, 
when we call `BarrierTaskContext()` here in a reused worker, a `TaskContext` 
instance will be returned in 
`TaskContext.__new__()`:https://github.com/apache/spark/blob/c00e72f3d7530eb2ae43d4d45e8efde783daf6ff/python/pyspark/taskcontext.py#L47



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232986340
  
--- Diff: python/pyspark/tests.py ---
@@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self):
 """
 Verify that BarrierTaskContext.barrier() with reused python worker.
 """
+self.sc._conf.set("spark.python.work.reuse", "true")
--- End diff --

@HyukjinKwon Hi Hyukjin if you still think this need a separate class I'll 
think about the method of checking worker reuse and give a follow up PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22962
  
Thanks @gatorsmile  @HyukjinKwon @cloud-fan !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232984147
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---
@@ -283,7 +283,9 @@ class VectorIndexerSuite extends MLTest with 
DefaultReadWriteTest with Logging {
 points.zip(rows.map(_(0))).foreach {
   case (orig: SparseVector, indexed: SparseVector) =>
 assert(orig.indices.length == indexed.indices.length)
-  case _ => throw new UnknownError("Unit test has a bug in it.") 
// should never happen
+  case _ =>
+// should never happen
+throw new IllegalAccessException("Unit test has a bug in it.")
--- End diff --

Thanks, done in a4f49ce by just `fail` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232983941
  
--- Diff: dev/checkstyle-suppressions.xml ---
@@ -46,4 +46,12 @@
   
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
 
+

[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...

2018-11-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232955017
  
--- Diff: dev/checkstyle-suppressions.xml ---
@@ -46,4 +46,12 @@
   
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
 
+

[GitHub] spark issue #22989: [SPARK-25986][Build] Add rules to ban throw Errors in ap...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22989
  
@srowen Great thanks for your guidance, address all your suggestion in 
ff234d3 and update the record table in 
https://github.com/apache/spark/pull/22989#issuecomment-437939830.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232722383
  
--- Diff: dev/checkstyle.xml ---
@@ -64,6 +64,11 @@
 
 
 
+
+
--- End diff --

Thanks, done in 6e49bb8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232722184
  
--- Diff: scalastyle-config.xml ---
@@ -240,6 +240,18 @@ This file is divided into 3 sections:
 ]]>
   
 
+  
--- End diff --

Thanks, new rule named `throwerror`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232721829
  
--- Diff: dev/checkstyle.xml ---
@@ -64,6 +64,11 @@
 
 
 
+
+
+
--- End diff --

Thanks, rewrite this rule in 6e49bb8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22989#discussion_r232721412
  
--- Diff: scalastyle-config.xml ---
@@ -240,6 +240,18 @@ This file is divided into 3 sections:
 ]]>
   
 
+  
+throw new 
OutOfMemoryError
+

[GitHub] spark issue #22989: [SPARK-25986][Build] Banning throw new OutOfMemoryErrors

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22989
  
cc all reviewer, as @srowen's suggestion, add a rule to ban all of new 
Error cases.
List currently `throw new XXXError` in Spark source below and record fix up 
or exclude for review conveniently:

ErrorName | Count | In Test/Source Code | Fix Up or Exclude
 | - | - | -
AssertionError | 32 | 29 in test code | Exclude
| | | 2 in UnsafeAlignedOffset | Fix up by changing them to 
`IllegalArgumentException`
| | | 1 in KafkaUtils | Just exclude cause fix it will cause behavior change
NotImplementedError | 22 | 7 in source code/ 15 in test cases | Fix up by 
changing them to `UnsupportedOperationException`
OutOfMemoryError | 1 | 1 in test code | Exclude
LinkageError | 1 | 1 in test code | Exclude
SparkOutOfMemoryError | 5 | 5 in source code | Exclude
UnknownError | 9 | 5 in source code/ 4 in test cases | Fix up by changing 
to `IllegalAccessException ` and `IllegalArgumentException`

All above done in 6e49bb8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCo...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22955
  
Thanks @mgaido91 @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232634698
  
--- Diff: python/pyspark/tests.py ---
@@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self):
 """
 Verify that BarrierTaskContext.barrier() with reused python worker.
 """
+self.sc._conf.set("spark.python.work.reuse", "true")
--- End diff --

I do these 2 check like below:
1. Run this test case without fix in `BarrierTaskContext._getOrCreate`, the 
bug can be reproduced.
2. Same code running in pyspark shell and set 
`spark.python.work.resue=false`, it return successfully.
Maybe this can prove the UT can cover the issue and also can reuse the 
original barrier case code, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22989: [SPARK-25986][Build] Banning throw new OutOfMemoryErrors

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22989
  
Sorry for late reply, great thanks for all reviewer's advise, will address 
them soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22962
  
@HyukjinKwon Thanks for your review, comment address and PR 
description/title changed done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232528333
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -144,10 +144,19 @@ def __init__(self):
 """Construct a BarrierTaskContext, use get instead"""
 pass
 
+def __new__(cls):
--- End diff --

Yep, do this in `_getOrCreate` has same effect, this is an over consider of 
https://github.com/apache/spark/blob/aec0af4a952df2957e21d39d1e0546a36ab7ab86/python/pyspark/taskcontext.py#L44-L45
Deleted in 02555b8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22962#discussion_r232527808
  
--- Diff: python/pyspark/tests.py ---
@@ -614,6 +614,18 @@ def context_barrier(x):
 times = 
rdd.barrier().mapPartitions(f).map(context_barrier).collect()
 self.assertTrue(max(times) - min(times) < 1)
 
+def test_barrier_with_python_worker_reuse(self):
+"""
+Verify that BarrierTaskContext.barrier() with reused python worker.
+"""
+rdd = self.sc.parallelize(range(4), 4)
--- End diff --

Thanks, done in 02555b8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22955#discussion_r232489060
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers._
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.types.BooleanType
+
+class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Extract PythonUDF From JoinCondition", Once,
+PullOutPythonUDFInJoinCondition) ::
+  Batch("Check Cartesian Products", Once,
+CheckCartesianProducts) :: Nil
+  }
+
+  val testRelationLeft = LocalRelation('a.int, 'b.int)
+  val testRelationRight = LocalRelation('c.int, 'd.int)
+
+  // Dummy python UDF for testing. Unable to execute.
+  val pythonUDF = PythonUDF("pythonUDF", null,
+BooleanType,
+Seq.empty,
+PythonEvalType.SQL_BATCHED_UDF,
+udfDeterministic = true)
+
+  val unsupportedJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, 
LeftAnti)
+
+  private def comparePlansWithConf(query: LogicalPlan, expected: 
LogicalPlan): Unit = {
+// AnalysisException thrown by CheckCartesianProducts while 
spark.sql.crossJoin.enabled=false
+val exception = intercept[AnalysisException] {
+  Optimize.execute(query.analyze)
+}
+assert(exception.message.startsWith("Detected implicit cartesian 
product"))
+
+// pull out the python udf while set spark.sql.crossJoin.enabled=true
+withSQLConf(CROSS_JOINS_ENABLED.key -> "true") {
+  val optimized = Optimize.execute(query.analyze)
+  comparePlans(optimized, expected)
+}
+  }
+
+  test("inner join condition with python udf only") {
--- End diff --

I'm sorry for lacking of comments to your previous comment `they differ 
only by the join type...`, they differ not only the type, but also the expected 
plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22955#discussion_r232488956
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 ---
@@ -50,20 +50,11 @@ class PullOutPythonUDFInJoinConditionSuite extends 
PlanTest {
 PythonEvalType.SQL_BATCHED_UDF,
 udfDeterministic = true)
 
-  val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti)
-
-  test("inner join condition with python udf only") {
-val query = testRelationLeft.join(
-  testRelationRight,
-  joinType = Inner,
-  condition = Some(pythonUDF))
-val expected = testRelationLeft.join(
-  testRelationRight,
-  joinType = Inner,
-  condition = None).where(pythonUDF).analyze
+  val unsupportedJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, 
LeftAnti)
 
+  private def comparePlansWithConf(query: LogicalPlan, expected: 
LogicalPlan): Unit = {
--- End diff --

how about `comparePlanWithCrossJoinEnable`? Just afraid it's too long at 
first, any advise :) Thanks. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateF...

2018-11-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21363
  
@HyukjinKwon Great thanks for ping me, I'll try to work on this and cc all 
reviewer in this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22989: [SPARK-25986][Build] Banning throw new OutOfMemoryErrors

2018-11-09 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22989
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22955#discussion_r232163956
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers._
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.types.BooleanType
+
+class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Extract PythonUDF From JoinCondition", Once,
+PullOutPythonUDFInJoinCondition) ::
+  Batch("Check Cartesian Products", Once,
+CheckCartesianProducts) :: Nil
+  }
+
+  val testRelationLeft = LocalRelation('a.int, 'b.int)
+  val testRelationRight = LocalRelation('c.int, 'd.int)
+
+  // Dummy python UDF for testing. Unable to execute.
+  val pythonUDF = PythonUDF("pythonUDF", null,
+BooleanType,
+Seq.empty,
+PythonEvalType.SQL_BATCHED_UDF,
+udfDeterministic = true)
+
+  val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti)
+
+  test("inner join condition with python udf only") {
+val query = testRelationLeft.join(
+  testRelationRight,
+  joinType = Inner,
+  condition = Some(pythonUDF))
+val expected = testRelationLeft.join(
+  testRelationRight,
+  joinType = Inner,
+  condition = None).where(pythonUDF).analyze
+
+// AnalysisException thrown by CheckCartesianProducts while 
spark.sql.crossJoin.enabled=false
+val exception = the [AnalysisException] thrownBy {
+  Optimize.execute(query.analyze)
+}
+assert(exception.message.startsWith("Detected implicit cartesian 
product"))
+
+// pull out the python udf while set spark.sql.crossJoin.enabled=true
+withSQLConf(CROSS_JOINS_ENABLED.key -> "true") {
+  val optimized = Optimize.execute(query.analyze)
+  comparePlans(optimized, expected)
+}
+  }
+
+  test("left semi join condition with python udf only") {
+val query = testRelationLeft.join(
+  testRelationRight,
+  joinType = LeftSemi,
+  condition = Some(pythonUDF))
+val expected = testRelationLeft.join(
+  testRelationRight,
+  joinType = Inner,
+  condition = None).where(pythonUDF).select('a, 'b).analyze
+
+// AnalysisException thrown by CheckCartesianProducts while 
spark.sql.crossJoin.enabled=false
+val exception = the [AnalysisException] thrownBy {
+  Optimize.execute(query.analyze)
+}
+assert(exception.message.startsWith("Detected implicit cartesian 
product"))
+
+// pull out the python udf while set spark.sql.crossJoin.enabled=true
+withSQLConf(CROSS_JOINS_ENABLED.key -> "true") {
+  val optimized = Optimize.execute(query.analyze)
+  comparePlans(optimized, expected)
+}
+  }
+
+  test("python udf with other common condition") {
--- End diff --

Thanks, add more cases in 38b1555.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22955#discussion_r232163715
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers._
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.types.BooleanType
+
+class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Extract PythonUDF From JoinCondition", Once,
+PullOutPythonUDFInJoinCondition) ::
+  Batch("Check Cartesian Products", Once,
+CheckCartesianProducts) :: Nil
+  }
+
+  val testRelationLeft = LocalRelation('a.int, 'b.int)
+  val testRelationRight = LocalRelation('c.int, 'd.int)
+
+  // Dummy python UDF for testing. Unable to execute.
+  val pythonUDF = PythonUDF("pythonUDF", null,
+BooleanType,
+Seq.empty,
+PythonEvalType.SQL_BATCHED_UDF,
+udfDeterministic = true)
+
+  val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti)
--- End diff --

Thanks, done in 38b1555.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22955#discussion_r232163787
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers._
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.types.BooleanType
+
+class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Extract PythonUDF From JoinCondition", Once,
+PullOutPythonUDFInJoinCondition) ::
+  Batch("Check Cartesian Products", Once,
+CheckCartesianProducts) :: Nil
+  }
+
+  val testRelationLeft = LocalRelation('a.int, 'b.int)
+  val testRelationRight = LocalRelation('c.int, 'd.int)
+
+  // Dummy python UDF for testing. Unable to execute.
+  val pythonUDF = PythonUDF("pythonUDF", null,
+BooleanType,
+Seq.empty,
+PythonEvalType.SQL_BATCHED_UDF,
+udfDeterministic = true)
+
+  val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti)
+
+  test("inner join condition with python udf only") {
--- End diff --

Sorry for this, done in 38b1555.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22955#discussion_r232163738
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullOutPythonUDFInJoinConditionSuite.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers._
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.types.BooleanType
+
+class PullOutPythonUDFInJoinConditionSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("Extract PythonUDF From JoinCondition", Once,
+PullOutPythonUDFInJoinCondition) ::
+  Batch("Check Cartesian Products", Once,
+CheckCartesianProducts) :: Nil
+  }
+
+  val testRelationLeft = LocalRelation('a.int, 'b.int)
+  val testRelationRight = LocalRelation('c.int, 'd.int)
+
+  // Dummy python UDF for testing. Unable to execute.
+  val pythonUDF = PythonUDF("pythonUDF", null,
+BooleanType,
+Seq.empty,
+PythonEvalType.SQL_BATCHED_UDF,
+udfDeterministic = true)
+
+  val notSupportJoinTypes = Seq(LeftOuter, RightOuter, FullOuter, LeftAnti)
+
+  test("inner join condition with python udf only") {
+val query = testRelationLeft.join(
+  testRelationRight,
+  joinType = Inner,
+  condition = Some(pythonUDF))
+val expected = testRelationLeft.join(
+  testRelationRight,
+  joinType = Inner,
+  condition = None).where(pythonUDF).analyze
+
+// AnalysisException thrown by CheckCartesianProducts while 
spark.sql.crossJoin.enabled=false
+val exception = the [AnalysisException] thrownBy {
--- End diff --

Thanks, done in 38b1555.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...

2018-11-08 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22989

[SPARK-25986][Build] Banning throw new OutOfMemoryErrors

## What changes were proposed in this pull request?

Add scala and java lint check rules to ban the usage of `throw new 
OutOfMemoryErrors` cause it will cause hole executor killed. See more details 
in https://github.com/apache/spark/pull/22969.

## How was this patch tested?

Local test with lint-scala and lint-java.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25986

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22989.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22989


commit d67875115f622082519b1dbcb1c1e34c2184b34f
Author: Yuanjian Li 
Date:   2018-11-09T05:23:01Z

banning throw new OutOfMemoryErrors




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCo...

2018-11-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22955
  
Thanks for the reply, unnecessary end-to-end tests removed in 
https://github.com/apache/spark/pull/22326/commits/2b6977de4a3b3489b9c2172a6a8a39831bf1d048,
 others maybe should be kept? Cause mock python udf in scala side can't 100% 
same with python side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCo...

2018-11-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22955
  
cc @cloud-fan @mgaido91 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext while pyth...

2018-11-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22962
  
cc @cloud-fan @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext whi...

2018-11-07 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22962

[SPARK-25921][PySpark] Fix BarrierTaskContext while python worker reuse

## What changes were proposed in this pull request?

While python worker reuse, BarrierTaskContext._getOrCreate() will still 
return a TaskContext, we'll get a `AttributeError: 'TaskContext' object has no 
attribute 'barrier'`. Fix this by adding check logic in 
BarrierTaskContext._getOrCreate() and rewrite `__new__` method for 
BarrierTaskContext.

## How was this patch tested?

Add new UT in pyspark-core.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25921

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22962


commit 0cb2cf6e9ece66861073c31b579b595a9de5ce81
Author: Yuanjian Li 
Date:   2018-11-07T10:01:54Z

fix BarrierTaskContext while python worker reuse




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for ContextBarrierSta...

2018-11-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
gental ping @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for ContextBarrierSta...

2018-11-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22955: [SPARK-25949][SQL] Add test for PullOutPythonUDFI...

2018-11-06 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22955

[SPARK-25949][SQL] Add test for PullOutPythonUDFInJoinCondition

## What changes were proposed in this pull request?

As comment in 
https://github.com/apache/spark/pull/22326#issuecomment-424923967, we test the 
new added optimizer rule by end-to-end test in python side, need to add suites 
under `org.apache.spark.sql.catalyst.optimizer` like other optimizer rules.

## How was this patch tested?
new added UT


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25949

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22955


commit 344b516a414107cf5494951ae74886b6f62f3e5a
Author: Yuanjian Li 
Date:   2018-11-06T12:29:26Z

Add test for PullOutPythonUDFInJoinCondition




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >