[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r219675105
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -995,7 +995,8 @@ class Dataset[T] private[sql](
 // After the cloning, left and right side will have distinct 
expression ids.
 val plan = withPlan(
   Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr)))
-  .queryExecution.analyzed.asInstanceOf[Join]
+  .queryExecution.analyzed
+val joinPlan = plan.collectFirst { case j: Join => j }.get
--- End diff --

For reviewer, we need this change cause the rule 
`HandlePythonUDFInJoinCondition` will break the assumption about the join plan 
after analyzing will only return Join. After we add the rule of handling python 
udf, we'll add filter or project node on top of Join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...

2018-09-19 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22467#discussion_r218835749
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala 
---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+// The data where the partitioning key exists only in the directory 
structure.
+case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
+case class StructContainer(intStructField: Int, stringStructField: String)
+
+case class ParquetDataWithComplexTypes(
+intField: Int,
+stringField: String,
+structField: StructContainer,
+arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+p: Int,
+intField: Int,
+stringField: String,
+structField: StructContainer,
+arrayField: Seq[Int])
+
+/**
+ * A collection of tests for parquet data with various forms of 
partitioning.
+ */
+abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
+  import testImplicits._
+
+  var partitionedTableDir: File = null
+  var normalTableDir: File = null
+  var partitionedTableDirWithKey: File = null
+  var partitionedTableDirWithComplexTypes: File = null
+  var partitionedTableDirWithKeyAndComplexTypes: File = null
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+partitionedTableDir = Utils.createTempDir()
+normalTableDir = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDir, s"p=$p")
+  sparkContext.makeRDD(1 to 10)
+.map(i => ParquetData(i, s"part-$p"))
+.toDF()
+.write.parquet(partDir.getCanonicalPath)
+}
+
+sparkContext
+  .makeRDD(1 to 10)
+  .map(i => ParquetData(i, s"part-1"))
+  .toDF()
+  .write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
+
+partitionedTableDirWithKey = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+  sparkContext.makeRDD(1 to 10)
+.map(i => ParquetDataWithKey(p, i, s"part-$p"))
+.toDF()
+.write.parquet(partDir.getCanonicalPath)
+}
+
+partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, 
s"p=$p")
+  sparkContext.makeRDD(1 to 10).map { i =>
+ParquetDataWithKeyAndComplexTypes(
+  p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+  }.toDF().write.parquet(partDir.getCanonicalPath)
+}
+
+partitionedTableDirWithComplexTypes = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+  sparkContext.makeRDD(1 to 10).map { i =>
+ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, 
f"${i}_string"), 1 to i)
+  }.toDF().write.parquet(partDir.getCanonicalPath)
+}
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  partitionedTableDir.delete()
+  normalTableDir.delete()
+  partitionedTableDirWithKey.delete()
+  partitionedTableDirWithComplexTypes.dele

[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...

2018-09-19 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22467#discussion_r218844704
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala 
---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class HiveParquetSourceSuite extends ParquetPartitioningTest {
+  import testImplicits._
+  import spark._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+dropTables("partitioned_parquet",
+  "partitioned_parquet_with_key",
+  "partitioned_parquet_with_complextypes",
+  "partitioned_parquet_with_key_and_complextypes",
+  "normal_parquet")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDir.toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet_with_key
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDirWithKey.toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW normal_parquet
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${new File(partitionedTableDir, "p=1").toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDirWithComplexTypes.toURI}'
+  )
+""")
+  }
+
+  test("SPARK-6016 make sure to use the latest footers") {
+sql("drop table if exists spark_6016_fix")
+
+// Create a DataFrame with two partitions. So, the created table will 
have two parquet files.
+val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2)
+
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
+checkAnswer(
+  sql("select * from spark_6016_fix"),
+  (1 to 10).map(i => Row(i))
+)
+
+// Create a DataFrame with four partitions. So, the created table will 
have four parquet files.
+val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4)
+
df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
+// For the bug of SPARK-6016, we are caching two outdated footers for 
df1. Then,
+// since the new table has four parquet files, we are trying to read 
new footers from two files
+// and then merge metadata in footers of these four (two outdated ones 
and two latest one),
+// which will cause an error.
+checkAnswer(
+  sql("select * from spark_6016_fix"),
+  (1 to 10).map(i => Row(i))
+)
+
+sql("drop table spark_6016_fix")
+  }
+
+  tes

[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...

2018-09-19 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22467#discussion_r218846178
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala 
---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class HiveParquetSourceSuite extends ParquetPartitioningTest {
+  import testImplicits._
+  import spark._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+dropTables("partitioned_parquet",
+  "partitioned_parquet_with_key",
+  "partitioned_parquet_with_complextypes",
+  "partitioned_parquet_with_key_and_complextypes",
+  "normal_parquet")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDir.toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet_with_key
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDirWithKey.toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW normal_parquet
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${new File(partitionedTableDir, "p=1").toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
+  )
+""")
+
+sql( s"""
+  CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
+  USING org.apache.spark.sql.parquet
+  OPTIONS (
+path '${partitionedTableDirWithComplexTypes.toURI}'
+  )
+""")
+  }
+
+  test("SPARK-6016 make sure to use the latest footers") {
+sql("drop table if exists spark_6016_fix")
--- End diff --

This case can be wrapped with `withTable("spark_6016_fix")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...

2018-09-19 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22467#discussion_r218835682
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala 
---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+// The data where the partitioning key exists only in the directory 
structure.
+case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
+case class StructContainer(intStructField: Int, stringStructField: String)
+
+case class ParquetDataWithComplexTypes(
+intField: Int,
+stringField: String,
+structField: StructContainer,
+arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+p: Int,
+intField: Int,
+stringField: String,
+structField: StructContainer,
+arrayField: Seq[Int])
+
+/**
+ * A collection of tests for parquet data with various forms of 
partitioning.
+ */
+abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
+  import testImplicits._
+
+  var partitionedTableDir: File = null
+  var normalTableDir: File = null
+  var partitionedTableDirWithKey: File = null
+  var partitionedTableDirWithComplexTypes: File = null
+  var partitionedTableDirWithKeyAndComplexTypes: File = null
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+partitionedTableDir = Utils.createTempDir()
+normalTableDir = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDir, s"p=$p")
+  sparkContext.makeRDD(1 to 10)
+.map(i => ParquetData(i, s"part-$p"))
+.toDF()
+.write.parquet(partDir.getCanonicalPath)
+}
+
+sparkContext
+  .makeRDD(1 to 10)
+  .map(i => ParquetData(i, s"part-1"))
+  .toDF()
+  .write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
+
+partitionedTableDirWithKey = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+  sparkContext.makeRDD(1 to 10)
+.map(i => ParquetDataWithKey(p, i, s"part-$p"))
+.toDF()
+.write.parquet(partDir.getCanonicalPath)
+}
+
+partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, 
s"p=$p")
+  sparkContext.makeRDD(1 to 10).map { i =>
+ParquetDataWithKeyAndComplexTypes(
+  p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+  }.toDF().write.parquet(partDir.getCanonicalPath)
+}
+
+partitionedTableDirWithComplexTypes = Utils.createTempDir()
+
+(1 to 10).foreach { p =>
+  val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+  sparkContext.makeRDD(1 to 10).map { i =>
+ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, 
f"${i}_string"), 1 to i)
+  }.toDF().write.parquet(partDir.getCanonicalPath)
+}
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  partitionedTableDir.delete()
+  normalTableDir.delete()
+  partitionedTableDirWithKey.delete()
+  partitionedTableDirWithComplexTypes.dele

[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r218660634
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

Yep, sorry for the late reply. :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
@maropu @dongjoon-hyun Great thanks for your guidance !
```
Apache Spark already supports changing column types as a part of schema 
evolution. Especially, ORC vectorized reader support upcasting although it's 
not the same with canCast.

For the detail support Spark coverage, see SPARK-23007. It covered all 
built-in data source at that time.
```
Great thanks, I'll study these background soon.
```
Please note that every data sources have different capability. So, this PR 
needs to prevent ALTER TABLE CHANGE COLUMN for those data sources case-by-case. 
And, we need corresponding test cases.
```
Got it, I'll keep following the cases in this PR, I roughly split these 
into 4 tasks and update the description of this PR firstly. I'll pay attention 
to the corresponding test cases in each task.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22381
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22381
  
UT fixed by #22452.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
UT fixed by #22452.
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r218328998
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -84,20 +88,20 @@ private[spark] class BarrierCoordinator(
 
   /**
* Provide the current state of a barrier() call. A state is created 
when a new stage attempt
-   * sends out a barrier() call, and recycled on stage completed.
+   * sends out a barrier() call, and recycled on stage completed. Visible 
for testing.
*
* @param barrierId Identifier of the barrier stage that make a 
barrier() call.
* @param numTasks Number of tasks of the barrier stage, all barrier() 
calls from the stage shall
* collect `numTasks` requests to succeed.
*/
-  private class ContextBarrierState(
+  private[spark] class ContextBarrierState(
   val barrierId: ContextBarrierId,
   val numTasks: Int) {
 
 // There may be multiple barrier() calls from a barrier stage attempt, 
`barrierEpoch` is used
 // to identify each barrier() call. It shall get increased when a 
barrier() call succeeds, or
 // reset when a barrier() call fails due to timeout.
-private var barrierEpoch: Int = 0
+private[spark] var barrierEpoch: Int = 0
--- End diff --

Make sense, done in ec8466a。


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r218324420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for your advise!
I look into this in these days. With currently implement, all behavior 
comply with Hive(Support type change/Work well in non binary format/Exception 
in binary format like orc and parquet). Is it ok to add a config for constraint 
this?

The work of adding logic to cast input data into changed type in catalog 
may need modifying 4 parts logic including vectorized reader and row reader in 
parquet and orc. If we don't agree the currently behavior, I'll keep following 
these.

Item | Behavior
 | -
Parquet Row Reader | ClassCastException in SpecificInternalRow.set${Type}
Parquet Vectorized Reader | SchemaColumnConvertNotSupportedException in 
VectorizedColumnReader.read${Type}Batch
Orc Row Reader | ClassCastException in OrcDeserializer.newWriter
Orc Vectorized Reader | NullPointerException in OrcColumnVector get value 
by type method


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-17 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
gental ping @jiangxb1987


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-09-15 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r217677591
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---
@@ -205,6 +205,34 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
 }
   }
 
+  allFileBasedDataSources.foreach { format =>
--- End diff --

@kiszk Add UT in FileBasedDataSourceSuite as we discuss in 
https://github.com/apache/spark/pull/21618#discussion_r216156188.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22402: [SPARK-25414][SS] The numInputRows metrics can be...

2018-09-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22402#discussion_r217440426
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -460,9 +460,9 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 val streamingInputDF = 
createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
 
 val progress = 
getFirstProgress(streamingInputDF.join(streamingInputDF, "value"))
-assert(progress.numInputRows === 20) // data is read multiple times in 
self-joins
--- End diff --

So IIUC in this line, the EXCHANGE_REUSE_ENABLED == true, and its not read 
twice actually?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22412: [SPARK-25404][SQL] Staging path may not on the ex...

2018-09-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22412#discussion_r217431206
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -217,12 +217,7 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 val inputPathUri: URI = inputPath.toUri
 val inputPathName: String = inputPathUri.getPath
 val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-var stagingPathName: String =
--- End diff --

This code is introduced form https://github.com/apache/spark/pull/12770, do 
you check the relative UT? Maybe UT relating with SessionState and DDLSuite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22386
  
Great thanks for your comment and fix @mukulmurthy! We'll also port this 
soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22386#discussion_r216730172
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
 ---
@@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
 
 // If we're in continuous processing mode, we should get the store 
version for the current
 // epoch rather than the one at planning time.
-val currentVersion = EpochTracker.getCurrentEpoch match {
-  case None => storeVersion
-  case Some(value) => value
+val isContinuous = 
Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
--- End diff --

Just simple `toBoolean` here is OK?Cause you set default value both 
MicroBatch and Continuous side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22386
  
```
If a continuous processing job runs and the same thread gets reused for a 
microbatch execution job in the same environment
```
Little confuse about this scenario, could you explain more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22386#discussion_r216723459
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest {
 false))
   }
 
+  test("is_continuous_processing property should be false for microbatch 
processing") {
+val input = MemoryStream[Int]
+val df = input.toDS()
+  .map(i => 
TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
+testStream(df) (
+  AddData(input, 1),
+  CheckAnswer("false")
+)
+  }
+
+  test("is_continuous_processing property should be true for continuous 
processing") {
+val input = ContinuousMemoryStream[Int]
+var x: String = ""
--- End diff --

unused?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22386#discussion_r216723249
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest {
 false))
   }
 
+  test("is_continuous_processing property should be false for microbatch 
processing") {
+val input = MemoryStream[Int]
+val df = input.toDS()
+  .map(i => 
TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
+testStream(df) (
+  AddData(input, 1),
+  CheckAnswer("false")
+)
+  }
+
+  test("is_continuous_processing property should be true for continuous 
processing") {
+val input = ContinuousMemoryStream[Int]
+var x: String = ""
--- End diff --

Non used var?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attributes f...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22326
  
@cloud-fan Thanks for your comment.
```
IIUC, you are pulling out the join condition with python UDF and create a 
filter above join. Then the join become a cross join, which usually runs very 
slowly.
```
Yes, that's right.
```
I think we should keep the cross join check for this case.
```
Yes, as Marco suggestion, the currently behavior is control cross join by 
`crossJoinEnabled` config, if crossJoinEnabled = false, it will throw 
AnalysisException.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216606555
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose 
only limited the change scope to pythonUDF only or at lease Unevaluable only. 
Waiting for others advice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216600156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

```
Although the query above doesn't work well, why do users change column 
types?
```
As the scenario described above, user firstly use int but during some time 
found here we need a Long, he can rewrite the new data as Long and load data to 
new partitions. And if we not support the type change, user should do the table 
recreate job for this type change work.

Yep, if not the binary file, the query works OK.
```
Logging initialized using configuration in 
jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties
hive> CREATE TABLE t(a INT, b STRING, c INT);
OK
Time taken: 2.576 seconds
hive> INSERT INTO t VALUES (1, 'a', 3);;
Query ID = XuanYuan_20180911164348_32238a6c-b0a4-4cfd-aa3d-00a7628031cf
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2018-09-11 16:43:51,684 Stage-1 map = 100%,  reduce = 0%
Ended Job = job_local162423_0001
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.
Moving data to: 
file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-11_16-43-48_117_2262603440504094412-1/-ext-1
Loading data to table default.t
Table default.t stats: [numFiles=1, numRows=1, totalSize=6, rawDataSize=5]
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 4.025 seconds
hive> select * from t;;
OK
1   a   3
Time taken: 0.164 seconds, Fetched: 1 row(s)
hive> ALTER TABLE t CHANGE a a STRING;
OK
Time taken: 0.177 seconds
hive> select * from t;
OK
1   a   3
Time taken: 0.12 seconds, Fetched: 1 row(s)
hive> quit;
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216588353
  
--- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * This is based on hadoop-common-2.7.2
+ * {@link org.apache.hadoop.fs.Globber}.
+ * This class exposes globWithThreshold which can be used glob path in 
parallel.
+ */
+public class SparkGlobber {
+  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
+
+  private final FileSystem fs;
+  private final FileContext fc;
+  private final Path pathPattern;
+
+  public SparkGlobber(FileSystem fs, Path pathPattern) {
+this.fs = fs;
+this.fc = null;
+this.pathPattern = pathPattern;
+  }
+
+  public SparkGlobber(FileContext fc, Path pathPattern) {
+this.fs = null;
+this.fc = fc;
+this.pathPattern = pathPattern;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.getFileStatus(path);
+  } else {
+return fc.getFileStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return null;
+}
+  }
+
+  private FileStatus[] listStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.listStatus(path);
+  } else {
+return fc.util().listStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return new FileStatus[0];
+}
+  }
+
+  private Path fixRelativePart(Path path) {
+if (fs != null) {
+  return fs.fixRelativePart(path);
+} else {
+  return fc.fixRelativePart(path);
+}
+  }
+
+  /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer 
to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+return name.replaceAll("(.)", "$1");
+  }
+
+  /**
+   * Translate an absolute path into a list of path components.
+   * We merge double slashes into a single slash here.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
+   */
+  private static List getPathComponents(String path)
+  throws IOException {
+ArrayList ret = new ArrayList();
+for (String component : path.split(Path.SEPARATOR)) {
+  if (!component.isEmpty()) {
+ret.add(component);
+  }
+}
+return ret;
+  }
+
+  private String schemeFromPath(Path path) throws IOException {
+String scheme = path.toUri().getScheme();
+if (scheme == null) {
+  if (fs != null) {
+scheme = fs.getUri().getScheme();
+  } else {
+scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
+  }
+}
+return scheme;
+  }
+
+  private String authorityFromPath(Path path) throws IOException {
+String authority = path.toUri().getAuthority();
+if (authority == null) {
+  if (fs != null) {
+authority = fs.getUri().getAuthority();
+  } else {
+authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
+  }
+}
+return authority ;
+  }
+
+  public FileStatus[] globWithThreshold(int threshold) throws IOException {
+ 

[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216587584
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala ---
@@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with 
Matchers {
 })
   }
 
+  test("test expanding glob path") {
--- End diff --

No problem, I'll fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216585000
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

As the code in canEvaluateWithinJoin, we can get the scope relation : 
(CannotEvaluateWithinJoin = nonDeterminstic +  Unevaluable) > Unevaluable > 
PythonUDF. 
So for the safety maybe I just limit the change scope to the smallest 
PythonUDF only. Need some advise from you thanks :)



https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L104-L120


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216583509
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

Yes, I changed this to let the test passing. The original thought is 
nondeterministic expression in join condition is not supported yet, so that's 
no big 
problem.https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L105

https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1158-L1159
But now I think I should more carefully about this and just limit the cross 
join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/22369


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22369
  
As the comment in 
https://github.com/apache/spark/pull/22140#issuecomment-419997180, I think this 
doc change is no more needed, I just close this, thanks @BryanCutler and 
@HyukjinKwon !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377882
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class AppStatusSource extends Source{
--- End diff --

nit: Source {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216378185
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

nit: `: AppStatusStore`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377621
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,12 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)}
+  }
+  else {
--- End diff --

nit: } else {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377526
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
 setupAndStartListenerBus()
 postEnvironmentUpdate()
+_env.metricsSystem.registerSource(appStatusSource)
--- End diff --

Better to put this line to +574


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...

2018-09-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22369#discussion_r216189359
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1901,6 +1901,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above
 
   - As of version 2.3.1 Arrow functionality, including `pandas_udf` and 
`toPandas()`/`createDataFrame()` with `spark.sql.execution.arrow.enabled` set 
to `True`, has been marked as experimental. These are still evolving and not 
currently recommended for use in production.
+  - In version 2.3.1 and earlier, it is possible for PySpark to create a 
Row object by providing more value than column number through the customized 
Row class. Since Spark 2.3.3, Spark will confirm value length is less or equal 
than column length in PySpark. See 
[SPARK-25072](https://issues.apache.org/jira/browse/SPARK-25072) for details.
--- End diff --

Thanks Bryan, I'll address this after discussion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...

2018-09-09 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22369
  
Got it, thanks @HyukjinKwon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147915
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -656,6 +656,25 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PARALLEL_GET_GLOBBED_PATH_THRESHOLD =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold")
+  .doc("The maximum number of subfiles or directories allowed after a 
globbed path " +
+"expansion.")
+  .intConf
+  .checkValue(threshold => threshold >= 0, "The maximum number of 
subfiles or directories " +
--- End diff --

Maybe we should keep this public? Because the parallel only opened when the 
thread number > 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147921
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,37 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * This will be done in main thread by default while the value of config
+   * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local 
thread
+   * pool will expand the globbed paths.
--- End diff --

Thanks, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,37 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * This will be done in main thread by default while the value of config
+   * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local 
thread
+   * pool will expand the globbed paths.
+   */
+  private def getGlobbedPaths(
--- End diff --

Thanks, that's more clear, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147889
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,37 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * This will be done in main thread by default while the value of config
+   * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local 
thread
+   * pool will expand the globbed paths.
+   */
+  private def getGlobbedPaths(
+  sparkSession: SparkSession,
--- End diff --

Thanks for advise, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147887
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1557,6 +1576,15 @@ class SQLConf extends Serializable with Logging {
   def parallelPartitionDiscoveryParallelism: Int =
 getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
 
+  def parallelGetGlobbedPathThreshold: Int =
+getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_THRESHOLD)
+
+  def parallelGetGlobbedPathNumThreads: Int =
+getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_NUM_THREADS)
+
+  def parallelGetGlobbedPathEnabled: Boolean =
--- End diff --

Thanks, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
```
@xuanyuanking Could you please update the document?
```
#22369 Thanks for reminding, I'll pay attention in future work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...

2018-09-08 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22369

[SPARK-25072][DOC] Update migration guide for behavior change

## What changes were proposed in this pull request?

Update the document for the behavior change in PySpark Row creation.

## How was this patch tested?

Existing UT.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25072-DOC

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22369


commit d257a38c647b45a9e83a2bdbbd2814f1b3fc5d56
Author: Yuanjian Li 
Date:   2018-09-09T04:26:23Z

Update doc for SPARK-25072




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
@kiszk @maropu Great thanks for your review and advise! I'll address them 
and resolve the conflicts ASAP.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216133261
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala ---
@@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with 
Matchers {
 })
   }
 
+  test("test expanding glob path") {
--- End diff --

```
IIUC, the new feature is disabled as default since 
spark.sql.sources.parallelGetGlobbedPath.numThreads is 0.
```
Yes that's right.

```
I am afraid these test causes are executed only with disabling the new 
feature.
```
These mainly test the correctness of `sparkHadoopUtil.expandGlobPath`, 
maybe it's necessary to keep.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216132926
  
--- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * This is based on hadoop-common-2.7.2
+ * {@link org.apache.hadoop.fs.Globber}.
+ * This class exposes globWithThreshold which can be used glob path in 
parallel.
+ */
+public class SparkGlobber {
+  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
+
+  private final FileSystem fs;
+  private final FileContext fc;
+  private final Path pathPattern;
+
+  public SparkGlobber(FileSystem fs, Path pathPattern) {
+this.fs = fs;
+this.fc = null;
+this.pathPattern = pathPattern;
+  }
+
+  public SparkGlobber(FileContext fc, Path pathPattern) {
+this.fs = null;
+this.fc = fc;
+this.pathPattern = pathPattern;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.getFileStatus(path);
+  } else {
+return fc.getFileStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return null;
+}
+  }
+
+  private FileStatus[] listStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.listStatus(path);
+  } else {
+return fc.util().listStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return new FileStatus[0];
+}
+  }
+
+  private Path fixRelativePart(Path path) {
+if (fs != null) {
+  return fs.fixRelativePart(path);
+} else {
+  return fc.fixRelativePart(path);
+}
+  }
+
+  /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer 
to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+return name.replaceAll("(.)", "$1");
+  }
+
+  /**
+   * Translate an absolute path into a list of path components.
+   * We merge double slashes into a single slash here.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
+   */
+  private static List getPathComponents(String path)
+  throws IOException {
+ArrayList ret = new ArrayList();
+for (String component : path.split(Path.SEPARATOR)) {
+  if (!component.isEmpty()) {
+ret.add(component);
+  }
+}
+return ret;
+  }
+
+  private String schemeFromPath(Path path) throws IOException {
+String scheme = path.toUri().getScheme();
+if (scheme == null) {
+  if (fs != null) {
+scheme = fs.getUri().getScheme();
+  } else {
+scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
+  }
+}
+return scheme;
+  }
+
+  private String authorityFromPath(Path path) throws IOException {
+String authority = path.toUri().getAuthority();
+if (authority == null) {
+  if (fs != null) {
+authority = fs.getUri().getAuthority();
+  } else {
+authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
+  }
+}
+return authority ;
+  }
+
+  public FileStatus[] globWithThreshold(int threshold) throws IOException {
+ 

[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216132779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Its also can't work in Hive, I test with Hive 1.2.2 and Hadoop 2.7.3.
```
Logging initialized using configuration in 
jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties
hive> CREATE TABLE t(a INT, b STRING, c INT) stored as parquet;
OK
Time taken: 1.604 seconds
hive> INSERT INTO t VALUES (1, 'a', 3);
Query ID = XuanYuan_20180908230549_3c8732ff-07e0-4a7a-95b4-260aed04a762
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
2018-09-08 23:06:08,732 Stage-1 map = 0%,  reduce = 0%
2018-09-08 23:06:09,743 Stage-1 map = 100%,  reduce = 0%
Ended Job = job_local712899233_0001
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.
Moving data to: 
file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-08_23-05-49_782_100109481692677607-1/-ext-1
Loading data to table default.t
Table default.t stats: [numFiles=1, numRows=1, totalSize=343, rawDataSize=3]
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 20.294 seconds
hive> select * from t;
OK
1   a   3
Time taken: 0.154 seconds, Fetched: 1 row(s)
hive> ALTER TABLE t CHANGE a a STRING;
OK
Time taken: 0.18 seconds
hive> select * from t;
OK
Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io.IntWritable
Time taken: 0.116 seconds
hive>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127932
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -547,6 +547,74 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, 
b1=1, b2=2)])
+
+def test_udf_and_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], 
"left_semi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_common_filter_in_join_condition(self):
--- End diff --

Add these two test for the comment in 
https://github.com/apache/spark/pull/22326#discussion_r216127673.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216127904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

So maybe we should also add a conf like 
`MetastoreConf.ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES` in hive to wrap 
this behavior? WDYT. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216127880
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for the checking, my mistake of not describe the intention to do 
this feature. We want support type change just want to add the ability of 
changing the metadata of column type. The scenario we meet is our user want a 
type change(like int is not enough, need a long type), they has done the type 
changing in their data file, but we should hack to change the metastore or 
create the whole table again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22326
  
@holdenk Thanks, sorry for the typo.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127710
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
--- End diff --

Could I try to answer this? The projection only used in a left semi join 
after cross join in this scenario for ensuring it only contains left side 
attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127673
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // only need to add cross join when whole commonJoinCondition 
are unevaluable
+  val newJoinType = if (commonJoinCondition.nonEmpty && 
newJoinCond.isEmpty) {
--- End diff --

Thanks, after a detailed checking, I change this to `others.nonEmpty`, this 
maybe an unnecessary worry about the commonJoin contains both unevaluable and 
evaluable condition. Also add a test in next commit to ensure this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127605
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // CheckCartesianProducts, we throw firstly here for better readable 
information.
+  throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+s"$commonJoinCondition of the join plan is unevaluable, we need to 
cast the " +
+"join to cross join by setting the configuration variable " +
+s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+}
+  }
+
+  /**
+   * Generate new left and right child of join by pushing down the side 
only join filter,
+   * split commonJoinCondition based on the expression can be evaluated 
within join or not.
+   *
+   * @return (newLeftChild, newRightChild, newJoinCondition, 
conditionCannotEvaluateWithinJoin)
--- End diff --

Got it, just see the demo here 
https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140,
 remove in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216122599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for your question, actually that's also what I'm consider during do 
the compatible check. Hive do this column type change work in 
[HiveAlterHandler](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java#L175
) and the detailed compatible check is in 
[ColumnType](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ColumnType.java#L206).
 You can see in the ColumnType checking work, it actually use the `canCast` 
semantic to judge compatible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215877417
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
 
 // CheckAnalysis will ensure nondeterministic expressions not appear 
in join condition.
 // TODO support nondeterministic expressions in join condition.
-comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
-  checkAnalysis = false)
+withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
+  comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
+checkAnalysis = false)
+}
+  }
+
+  test("join condition pushdown: deterministic and non-deterministic in 
left semi join") {
--- End diff --

I didn't add SPARK-25314 cause it maybe a supplement for test("join 
condition pushdown: deterministic and non-deterministic").


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215876606
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
+case LeftSemi =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  // push down the single side only join filter for both sides sub 
queries
+  val newLeft = leftJoinConditions.
--- End diff --

No problem, done in 87440b0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215876550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // [[CheckCartesianProducts]], we throw firstly here for better 
readable
--- End diff --

Thanks, done in 87440b0. I'll also pay attention in future work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215860035
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for advise, I should also check the type compatible, add in ef65c4d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215859851
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -1697,6 +1697,16 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
 sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is 
col1'")
 assert(getMetadata("col1").getString("key") == "value")
 assert(getMetadata("col1").getString("comment") == "this is col1")
+
+// Ensure that changing column type takes effect
+sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING")
+val column = 
catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1")
+assert(column.get.dataType == StringType)
+
+// Ensure that changing partition column type throw exception
+intercept[AnalysisException] {
+  sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING")
+}
--- End diff --

Thanks, done in ef65c4d. Also add check for type compatible check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215859764
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand(
 
 // Find the origin column from dataSchema by column name.
 val originColumn = findColumnByName(table.dataSchema, columnName, 
resolver)
-// Throw an AnalysisException if the column name/dataType is changed.
+// Throw an AnalysisException if the column name is changed.
 if (!columnEqual(originColumn, newColumn, resolver)) {
   throw new AnalysisException(
 "ALTER TABLE CHANGE COLUMN is not supported for changing column " +
   s"'${originColumn.name}' with type '${originColumn.dataType}' to 
" +
   s"'${newColumn.name}' with type '${newColumn.dataType}'")
--- End diff --

After add the type check, maybe we also need the type message in error 
message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215859681
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand(
 
 // Find the origin column from dataSchema by column name.
 val originColumn = findColumnByName(table.dataSchema, columnName, 
resolver)
-// Throw an AnalysisException if the column name/dataType is changed.
+// Throw an AnalysisException if the column name is changed.
 if (!columnEqual(originColumn, newColumn, resolver)) {
--- End diff --

Thanks, not enough yet, add type compatible check in ef65c4d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22326
  
Gental ping @mgaido91 @HyukjinKwon @dilipbiswal, great thanks for advice, 
please have a look when you have time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
Thanks @BryanCutler @HyukjinKwon !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215636283
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -187,6 +191,9 @@ private[spark] class BarrierCoordinator(
   requesters.clear()
   cancelTimerTask()
 }
+
+// Check for clearing internal data, visible for test only.
+private[spark] def cleanCheck(): Boolean = requesters.isEmpty && 
timerTask == null
--- End diff --

Add internal data clean check in Xingbo's comments: 
https://github.com/apache/spark/pull/22165#issuecomment-415086679.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215635587
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcTimeout
+
+class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext 
{
+
+  /**
+   * Get the current barrierEpoch from barrierCoordinator.states by 
ContextBarrierId
+   */
+  def getCurrentBarrierEpoch(
+  stageId: Int, stageAttemptId: Int, barrierCoordinator: 
BarrierCoordinator): Int = {
+val barrierId = ContextBarrierId(stageId, stageAttemptId)
+barrierCoordinator.states.get(barrierId).barrierEpoch
+  }
+
+  test("normal test for single task") {
+sc = new SparkContext("local", "test")
+val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, 
sc.env.rpcEnv)
+val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", 
barrierCoordinator)
+val stageId = 0
+val stageAttemptNumber = 0
+rpcEndpointRef.askSync[Unit](
+  message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, 
taskAttemptId = 0,
+barrierEpoch = 0),
+  timeout = new RpcTimeout(5 seconds, "rpcTimeOut"))
+// sleep for waiting barrierEpoch value change
+Thread.sleep(500)
--- End diff --

Thanks for guidance, done in ecf12bd. I'll also pay attention in the future 
work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215635071
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -65,7 +65,7 @@ private[spark] class BarrierCoordinator(
 
   // Record all active stage attempts that make barrier() call(s), and the 
corresponding internal
   // state.
-  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+  private[spark] val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
--- End diff --

No problem, done in ecf12bd.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22140#discussion_r215601543
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -269,6 +269,10 @@ def test_struct_field_type_name(self):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(slef):
+rowClass = Row("c1", "c2")
--- End diff --

Thanks, done in eb3f506.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22140#discussion_r215601486
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1397,6 +1397,8 @@ def _create_row_inbound_converter(dataType):
 
 
 def _create_row(fields, values):
+if len(values) > len(fields):
+raise ValueError("Can not create %s by %s" % (fields, values))
--- End diff --

Thanks, improve done and move this check to `__call__` in `Row`. eb3f506


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22140#discussion_r215601350
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -269,6 +269,10 @@ def test_struct_field_type_name(self):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(slef):
--- End diff --

Thanks, done in eb3f506.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22341#discussion_r215318105
  
--- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala ---
@@ -55,7 +55,7 @@ class RDDInfo(
 }
 
 private[spark] object RDDInfo {
-  private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
+  private lazy val callsiteForm = 
SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
--- End diff --

Thanks for explaining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22341#discussion_r215285275
  
--- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala ---
@@ -55,7 +55,7 @@ class RDDInfo(
 }
 
 private[spark] object RDDInfo {
-  private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
+  private lazy val callsiteForm = 
SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
--- End diff --

Is this related to the problem?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215271726
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

```
 I am a bit surprised that works, it would be great to understand why. 
Thanks.
```
Sorry for the bad test, that's too special and the result just right by 
accident. The original implement will make all semi join return `[]` in PySpark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
gentle ping @maropu, could you help to review this? I'll keep follow up 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
gental ping @cloud-fan @gatorsmile @kiszk, we still meet this in internal 
folk, could you help to review? I'll resolve the conflict, great thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
gental ping @HyukjinKwon @BryanCutler 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
```
Could I do the refactor of moving ContextBarrierState out of 
BarrierCoordinator?
```
gental ping @jiangxb1987, I still follow up this. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215261610
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

Thanks @mgaido91 and @dilipbiswal !
I fix this in 63fbcce. The mainly problem is semi join in both 
deterministic and non-deterministic condition, filter after semi join will 
fail. Also add more tests both on python and scala side, including semi join, 
inner join and complex scenario described below.
It makes the strategy difficult to read after considering left semi, so in 
63fbcce I split the logic of semi join and inner join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214974819
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

Thanks, I'll do more test on the SemiJoin here, but as currently test over 
PySpark, this is not wrong, maybe I misunderstand you two `wrong` means, is 
your `wrong` means correctness or just benchmark regression?

![image](https://user-images.githubusercontent.com/4833765/45043269-4ba20c80-b09f-11e8-84dc-a1f3ff416303.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214969191
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Filter(others.reduceLeft(And), join)
--- End diff --

Thanks, no need to add extra Filter in LeftSemi case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214968900
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
--- End diff --

Thanks, done in 82e50d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214968794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
--- End diff --

Make sense, also change this in `CheckCartesianProducts`. Done in 82e50d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214932266
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214931484
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
+  Cross
+} else joinType
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214841211
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

```
yes, and if the plan is big, than this would become quite unreadable IMHO. 
I think it would be better to refactor the message and put the plan at the end.
```
@mgaido91 Thanks for your advise, will do the refactor in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214840994
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@dilipbiswal Thanks for your detailed check, I should write the case more 
typical, here the case we want to solve is UDF which accessing the attribute in 
both side, I'll change the case to `dummyPythonUDF(col("a"), col("c")) === 
dummyPythonUDF(col("d"), col("c"))` in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214823799
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

The log will be shown like this:
```
16:13:35.218 WARN 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin: The whole 
commonJoinCondition:List((dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))) of the 
join plan:
 Join Inner, (dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))
:- LocalRelation [a#5, b#6]
+- LocalRelation [c#14, d#15]
 is unevaluable, it will be ignored and the join plan will be turned to 
cross join.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214823548
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
--- End diff --

Thanks for reminding, crossJoinEnable should be checked here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214807566
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("Python UDF refers to the attributes from more than one child in 
join condition") {
--- End diff --

Got it, will add in this commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214807480
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross 
else joinType
--- End diff --

Make sense, I'll leave a warn here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214807200
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.crossJoin(right).filter(f("a", "b"))
--- End diff --

ditto, the correct test is `df = left.join(right, f("a", "b"))`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214806996
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.crossJoin(right).filter(f("a", "b"))
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
--- End diff --

Yep, sorry for the mess here, another commit left on. I'll fix soon, how to 
cancel the test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214805785
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
--- End diff --

Sorry for the mess here, I'm changing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-03 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22326

[SPARK-25314][SQL] Fix Python UDF accessing attibutes from both side of 
join in join conditions

## What changes were proposed in this pull request?

Thanks for @bahchis reporting this. It is more like a follow up work for 
#16581, this PR fix the scenario of Python UDF accessing attributes from both 
side of join in join condition.

## How was this patch tested?

Add  regression tests in PySpark and `BatchEvalPythonExecSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25314

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22326


commit 23b10283500b18462a71eb525d4762dd33c3d4fa
Author: Yuanjian Li 
Date:   2018-09-04T06:43:14Z

Fix Python UDF accessing attibutes from both side of join in join conditions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22313: [SPARK-25306][SQL] Use cache to speed up `createF...

2018-09-01 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22313#discussion_r214528435
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala ---
@@ -55,19 +59,52 @@ import org.apache.spark.sql.types._
  * known to be convertible.
  */
 private[orc] object OrcFilters extends Logging {
+  case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, 
DataType])
+
+  private lazy val cacheExpireTimeout =
+
org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout
+
+  private lazy val searchArgumentCache = CacheBuilder.newBuilder()
+.expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS)
+.build(
+  new CacheLoader[FilterWithTypeMap, Option[Builder]]() {
+override def load(typeMapAndFilter: FilterWithTypeMap): 
Option[Builder] = {
+  buildSearchArgument(
+typeMapAndFilter.typeMap, typeMapAndFilter.filter, 
SearchArgumentFactory.newBuilder())
+}
+  })
+
+  private def getOrBuildSearchArgumentWithNewBuilder(
--- End diff --

Just a little question about is any possible to reuse code with 
https://github.com/apache/spark/pull/22313/files#diff-224b8cbedf286ecbfdd092d1e2e2f237R61?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >