[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r219675105 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -995,7 +995,8 @@ class Dataset[T] private[sql]( // After the cloning, left and right side will have distinct expression ids. val plan = withPlan( Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))) - .queryExecution.analyzed.asInstanceOf[Join] + .queryExecution.analyzed +val joinPlan = plan.collectFirst { case j: Join => j }.get --- End diff -- For reviewer, we need this change cause the rule `HandlePythonUDFInJoinCondition` will break the assumption about the join plan after analyzing will only return Join. After we add the rule of handling python udf, we'll add filter or project node on top of Join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22467#discussion_r218835749 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +// The data where the partitioning key exists only in the directory structure. +case class ParquetData(intField: Int, stringField: String) +// The data that also includes the partitioning key +case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) + +case class StructContainer(intStructField: Int, stringStructField: String) + +case class ParquetDataWithComplexTypes( +intField: Int, +stringField: String, +structField: StructContainer, +arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( +p: Int, +intField: Int, +stringField: String, +structField: StructContainer, +arrayField: Seq[Int]) + +/** + * A collection of tests for parquet data with various forms of partitioning. + */ +abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton { + import testImplicits._ + + var partitionedTableDir: File = null + var normalTableDir: File = null + var partitionedTableDirWithKey: File = null + var partitionedTableDirWithComplexTypes: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + + override def beforeAll(): Unit = { +super.beforeAll() +partitionedTableDir = Utils.createTempDir() +normalTableDir = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) +.map(i => ParquetData(i, s"part-$p")) +.toDF() +.write.parquet(partDir.getCanonicalPath) +} + +sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .toDF() + .write.parquet(new File(normalTableDir, "normal").getCanonicalPath) + +partitionedTableDirWithKey = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) +.map(i => ParquetDataWithKey(p, i, s"part-$p")) +.toDF() +.write.parquet(partDir.getCanonicalPath) +} + +partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => +ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().write.parquet(partDir.getCanonicalPath) +} + +partitionedTableDirWithComplexTypes = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => +ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().write.parquet(partDir.getCanonicalPath) +} + } + + override protected def afterAll(): Unit = { +try { + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithComplexTypes.dele
[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22467#discussion_r218844704 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * A suite of tests for the Parquet support through the data sources API. + */ +class HiveParquetSourceSuite extends ParquetPartitioningTest { + import testImplicits._ + import spark._ + + override def beforeAll(): Unit = { +super.beforeAll() +dropTables("partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes", + "normal_parquet") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDir.toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_key + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDirWithKey.toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW normal_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${new File(partitionedTableDir, "p=1").toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDirWithKeyAndComplexTypes.toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDirWithComplexTypes.toURI}' + ) +""") + } + + test("SPARK-6016 make sure to use the latest footers") { +sql("drop table if exists spark_6016_fix") + +// Create a DataFrame with two partitions. So, the created table will have two parquet files. +val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2) + df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") +checkAnswer( + sql("select * from spark_6016_fix"), + (1 to 10).map(i => Row(i)) +) + +// Create a DataFrame with four partitions. So, the created table will have four parquet files. +val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4) + df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") +// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then, +// since the new table has four parquet files, we are trying to read new footers from two files +// and then merge metadata in footers of these four (two outdated ones and two latest one), +// which will cause an error. +checkAnswer( + sql("select * from spark_6016_fix"), + (1 to 10).map(i => Row(i)) +) + +sql("drop table spark_6016_fix") + } + + tes
[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22467#discussion_r218846178 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * A suite of tests for the Parquet support through the data sources API. + */ +class HiveParquetSourceSuite extends ParquetPartitioningTest { + import testImplicits._ + import spark._ + + override def beforeAll(): Unit = { +super.beforeAll() +dropTables("partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes", + "normal_parquet") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDir.toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_key + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDirWithKey.toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW normal_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${new File(partitionedTableDir, "p=1").toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDirWithKeyAndComplexTypes.toURI}' + ) +""") + +sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( +path '${partitionedTableDirWithComplexTypes.toURI}' + ) +""") + } + + test("SPARK-6016 make sure to use the latest footers") { +sql("drop table if exists spark_6016_fix") --- End diff -- This case can be wrapped with `withTable("spark_6016_fix")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22467: [SPARK-25465][TEST] Refactor Parquet test suites ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22467#discussion_r218835682 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +// The data where the partitioning key exists only in the directory structure. +case class ParquetData(intField: Int, stringField: String) +// The data that also includes the partitioning key +case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) + +case class StructContainer(intStructField: Int, stringStructField: String) + +case class ParquetDataWithComplexTypes( +intField: Int, +stringField: String, +structField: StructContainer, +arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( +p: Int, +intField: Int, +stringField: String, +structField: StructContainer, +arrayField: Seq[Int]) + +/** + * A collection of tests for parquet data with various forms of partitioning. + */ +abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton { + import testImplicits._ + + var partitionedTableDir: File = null + var normalTableDir: File = null + var partitionedTableDirWithKey: File = null + var partitionedTableDirWithComplexTypes: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + + override def beforeAll(): Unit = { +super.beforeAll() +partitionedTableDir = Utils.createTempDir() +normalTableDir = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) +.map(i => ParquetData(i, s"part-$p")) +.toDF() +.write.parquet(partDir.getCanonicalPath) +} + +sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .toDF() + .write.parquet(new File(normalTableDir, "normal").getCanonicalPath) + +partitionedTableDirWithKey = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) +.map(i => ParquetDataWithKey(p, i, s"part-$p")) +.toDF() +.write.parquet(partDir.getCanonicalPath) +} + +partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => +ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().write.parquet(partDir.getCanonicalPath) +} + +partitionedTableDirWithComplexTypes = Utils.createTempDir() + +(1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => +ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().write.parquet(partDir.getCanonicalPath) +} + } + + override protected def afterAll(): Unit = { +try { + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithComplexTypes.dele
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r218660634 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- Yep, sorry for the late reply. :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 @maropu @dongjoon-hyun Great thanks for your guidance ! ``` Apache Spark already supports changing column types as a part of schema evolution. Especially, ORC vectorized reader support upcasting although it's not the same with canCast. For the detail support Spark coverage, see SPARK-23007. It covered all built-in data source at that time. ``` Great thanks, I'll study these background soon. ``` Please note that every data sources have different capability. So, this PR needs to prevent ALTER TABLE CHANGE COLUMN for those data sources case-by-case. And, we need corresponding test cases. ``` Got it, I'll keep following the cases in this PR, I roughly split these into 4 tasks and update the description of this PR firstly. I'll pay attention to the corresponding test cases in each task. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22381 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22381 UT fixed by #22452. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 UT fixed by #22452. retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r218328998 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -84,20 +88,20 @@ private[spark] class BarrierCoordinator( /** * Provide the current state of a barrier() call. A state is created when a new stage attempt - * sends out a barrier() call, and recycled on stage completed. + * sends out a barrier() call, and recycled on stage completed. Visible for testing. * * @param barrierId Identifier of the barrier stage that make a barrier() call. * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall * collect `numTasks` requests to succeed. */ - private class ContextBarrierState( + private[spark] class ContextBarrierState( val barrierId: ContextBarrierId, val numTasks: Int) { // There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used // to identify each barrier() call. It shall get increased when a barrier() call succeeds, or // reset when a barrier() call fails due to timeout. -private var barrierEpoch: Int = 0 +private[spark] var barrierEpoch: Int = 0 --- End diff -- Make sense, done in ec8466aã --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r218324420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for your advise! I look into this in these days. With currently implement, all behavior comply with Hive(Support type change/Work well in non binary format/Exception in binary format like orc and parquet). Is it ok to add a config for constraint this? The work of adding logic to cast input data into changed type in catalog may need modifying 4 parts logic including vectorized reader and row reader in parquet and orc. If we don't agree the currently behavior, I'll keep following these. Item | Behavior | - Parquet Row Reader | ClassCastException in SpecificInternalRow.set${Type} Parquet Vectorized Reader | SchemaColumnConvertNotSupportedException in VectorizedColumnReader.read${Type}Batch Orc Row Reader | ClassCastException in OrcDeserializer.newWriter Orc Vectorized Reader | NullPointerException in OrcColumnVector get value by type method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 gental ping @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r217677591 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala --- @@ -205,6 +205,34 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + allFileBasedDataSources.foreach { format => --- End diff -- @kiszk Add UT in FileBasedDataSourceSuite as we discuss in https://github.com/apache/spark/pull/21618#discussion_r216156188. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22402: [SPARK-25414][SS] The numInputRows metrics can be...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22402#discussion_r217440426 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -460,9 +460,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value") val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value")) -assert(progress.numInputRows === 20) // data is read multiple times in self-joins --- End diff -- So IIUC in this line, the EXCHANGE_REUSE_ENABLED == true, and its not read twice actually? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22412: [SPARK-25404][SQL] Staging path may not on the ex...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22412#discussion_r217431206 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -217,12 +217,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { val inputPathUri: URI = inputPath.toUri val inputPathName: String = inputPathUri.getPath val fs: FileSystem = inputPath.getFileSystem(hadoopConf) -var stagingPathName: String = --- End diff -- This code is introduced form https://github.com/apache/spark/pull/12770, do you check the relative UT? Maybe UT relating with SessionState and DDLSuite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22386 Great thanks for your comment and fix @mukulmurthy! We'll also port this soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22386#discussion_r216730172 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala --- @@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( // If we're in continuous processing mode, we should get the store version for the current // epoch rather than the one at planning time. -val currentVersion = EpochTracker.getCurrentEpoch match { - case None => storeVersion - case Some(value) => value +val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) --- End diff -- Just simple `toBoolean` here is OKï¼Cause you set default value both MicroBatch and Continuous side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22386 ``` If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment ``` Little confuse about this scenario, could you explain more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22386#discussion_r216723459 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest { false)) } + test("is_continuous_processing property should be false for microbatch processing") { +val input = MemoryStream[Int] +val df = input.toDS() + .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) +testStream(df) ( + AddData(input, 1), + CheckAnswer("false") +) + } + + test("is_continuous_processing property should be true for continuous processing") { +val input = ContinuousMemoryStream[Int] +var x: String = "" --- End diff -- unused? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22386#discussion_r216723249 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest { false)) } + test("is_continuous_processing property should be false for microbatch processing") { +val input = MemoryStream[Int] +val df = input.toDS() + .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) +testStream(df) ( + AddData(input, 1), + CheckAnswer("false") +) + } + + test("is_continuous_processing property should be true for continuous processing") { +val input = ContinuousMemoryStream[Int] +var x: String = "" --- End diff -- Non used var? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attributes f...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22326 @cloud-fan Thanks for your comment. ``` IIUC, you are pulling out the join condition with python UDF and create a filter above join. Then the join become a cross join, which usually runs very slowly. ``` Yes, that's right. ``` I think we should keep the cross join check for this case. ``` Yes, as Marco suggestion, the currently behavior is control cross join by `crossJoinEnabled` config, if crossJoinEnabled = false, it will throw AnalysisException. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216606555 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) --- End diff -- Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose only limited the change scope to pythonUDF only or at lease Unevaluable only. Waiting for others advice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216600156 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- ``` Although the query above doesn't work well, why do users change column types? ``` As the scenario described above, user firstly use int but during some time found here we need a Long, he can rewrite the new data as Long and load data to new partitions. And if we not support the type change, user should do the table recreate job for this type change work. Yep, if not the binary file, the query works OK. ``` Logging initialized using configuration in jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties hive> CREATE TABLE t(a INT, b STRING, c INT); OK Time taken: 2.576 seconds hive> INSERT INTO t VALUES (1, 'a', 3);; Query ID = XuanYuan_20180911164348_32238a6c-b0a4-4cfd-aa3d-00a7628031cf Total jobs = 3 Launching Job 1 out of 3 Number of reduce tasks is set to 0 since there's no reduce operator Job running in-process (local Hadoop) 2018-09-11 16:43:51,684 Stage-1 map = 100%, reduce = 0% Ended Job = job_local162423_0001 Stage-4 is selected by condition resolver. Stage-3 is filtered out by condition resolver. Stage-5 is filtered out by condition resolver. Moving data to: file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-11_16-43-48_117_2262603440504094412-1/-ext-1 Loading data to table default.t Table default.t stats: [numFiles=1, numRows=1, totalSize=6, rawDataSize=5] MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK Time taken: 4.025 seconds hive> select * from t;; OK 1 a 3 Time taken: 0.164 seconds, Fetched: 1 row(s) hive> ALTER TABLE t CHANGE a a STRING; OK Time taken: 0.177 seconds hive> select * from t; OK 1 a 3 Time taken: 0.12 seconds, Fetched: 1 row(s) hive> quit; ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216588353 --- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java --- @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * This is based on hadoop-common-2.7.2 + * {@link org.apache.hadoop.fs.Globber}. + * This class exposes globWithThreshold which can be used glob path in parallel. + */ +public class SparkGlobber { + public static final Log LOG = LogFactory.getLog(SparkGlobber.class.getName()); + + private final FileSystem fs; + private final FileContext fc; + private final Path pathPattern; + + public SparkGlobber(FileSystem fs, Path pathPattern) { +this.fs = fs; +this.fc = null; +this.pathPattern = pathPattern; + } + + public SparkGlobber(FileContext fc, Path pathPattern) { +this.fs = null; +this.fc = fc; +this.pathPattern = pathPattern; + } + + private FileStatus getFileStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.getFileStatus(path); + } else { +return fc.getFileStatus(path); + } +} catch (FileNotFoundException e) { + return null; +} + } + + private FileStatus[] listStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.listStatus(path); + } else { +return fc.util().listStatus(path); + } +} catch (FileNotFoundException e) { + return new FileStatus[0]; +} + } + + private Path fixRelativePart(Path path) { +if (fs != null) { + return fs.fixRelativePart(path); +} else { + return fc.fixRelativePart(path); +} + } + + /** + * Convert a path component that contains backslash ecape sequences to a + * literal string. This is necessary when you want to explicitly refer to a + * path that contains globber metacharacters. + */ + private static String unescapePathComponent(String name) { +return name.replaceAll("(.)", "$1"); + } + + /** + * Translate an absolute path into a list of path components. + * We merge double slashes into a single slash here. + * POSIX root path, i.e. '/', does not get an entry in the list. + */ + private static List getPathComponents(String path) + throws IOException { +ArrayList ret = new ArrayList(); +for (String component : path.split(Path.SEPARATOR)) { + if (!component.isEmpty()) { +ret.add(component); + } +} +return ret; + } + + private String schemeFromPath(Path path) throws IOException { +String scheme = path.toUri().getScheme(); +if (scheme == null) { + if (fs != null) { +scheme = fs.getUri().getScheme(); + } else { +scheme = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme(); + } +} +return scheme; + } + + private String authorityFromPath(Path path) throws IOException { +String authority = path.toUri().getAuthority(); +if (authority == null) { + if (fs != null) { +authority = fs.getUri().getAuthority(); + } else { +authority = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority(); + } +} +return authority ; + } + + public FileStatus[] globWithThreshold(int threshold) throws IOException { +
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216587584 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala --- @@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { }) } + test("test expanding glob path") { --- End diff -- No problem, I'll fix this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216585000 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) --- End diff -- As the code in canEvaluateWithinJoin, we can get the scope relation : (CannotEvaluateWithinJoin = nonDeterminstic + Unevaluable) > Unevaluable > PythonUDF. So for the safety maybe I just limit the change scope to the smallest PythonUDF only. Need some advise from you thanks :) https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L104-L120 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216583509 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) --- End diff -- Yes, I changed this to let the test passing. The original thought is nondeterministic expression in join condition is not supported yet, so that's no big problem.https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L105 https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1158-L1159 But now I think I should more carefully about this and just limit the cross join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
Github user xuanyuanking closed the pull request at: https://github.com/apache/spark/pull/22369 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22369 As the comment in https://github.com/apache/spark/pull/22140#issuecomment-419997180, I think this doc change is no more needed, I just close this, thanks @BryanCutler and @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377882 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ --- End diff -- nit: Source { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216378185 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- nit: `: AppStatusStore`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377621 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} + } + else { --- End diff -- nit: } else { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377526 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging { setupAndStartListenerBus() postEnvironmentUpdate() +_env.metricsSystem.registerSource(appStatusSource) --- End diff -- Better to put this line to +574 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22369#discussion_r216189359 --- Diff: docs/sql-programming-guide.md --- @@ -1901,6 +1901,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above - As of version 2.3.1 Arrow functionality, including `pandas_udf` and `toPandas()`/`createDataFrame()` with `spark.sql.execution.arrow.enabled` set to `True`, has been marked as experimental. These are still evolving and not currently recommended for use in production. + - In version 2.3.1 and earlier, it is possible for PySpark to create a Row object by providing more value than column number through the customized Row class. Since Spark 2.3.3, Spark will confirm value length is less or equal than column length in PySpark. See [SPARK-25072](https://issues.apache.org/jira/browse/SPARK-25072) for details. --- End diff -- Thanks Bryan, I'll address this after discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22369 Got it, thanks @HyukjinKwon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147915 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -656,6 +656,25 @@ object SQLConf { .intConf .createWithDefault(1) + val PARALLEL_GET_GLOBBED_PATH_THRESHOLD = +buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold") + .doc("The maximum number of subfiles or directories allowed after a globbed path " + +"expansion.") + .intConf + .checkValue(threshold => threshold >= 0, "The maximum number of subfiles or directories " + --- End diff -- Maybe we should keep this public? Because the parallel only opened when the thread number > 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147921 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,37 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * This will be done in main thread by default while the value of config + * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread + * pool will expand the globbed paths. --- End diff -- Thanks, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147919 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,37 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * This will be done in main thread by default while the value of config + * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread + * pool will expand the globbed paths. + */ + private def getGlobbedPaths( --- End diff -- Thanks, that's more clear, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147889 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,37 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * This will be done in main thread by default while the value of config + * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread + * pool will expand the globbed paths. + */ + private def getGlobbedPaths( + sparkSession: SparkSession, --- End diff -- Thanks for advise, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147887 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1557,6 +1576,15 @@ class SQLConf extends Serializable with Logging { def parallelPartitionDiscoveryParallelism: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + def parallelGetGlobbedPathThreshold: Int = +getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_THRESHOLD) + + def parallelGetGlobbedPathNumThreads: Int = +getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_NUM_THREADS) + + def parallelGetGlobbedPathEnabled: Boolean = --- End diff -- Thanks, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 ``` @xuanyuanking Could you please update the document? ``` #22369 Thanks for reminding, I'll pay attention in future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22369 [SPARK-25072][DOC] Update migration guide for behavior change ## What changes were proposed in this pull request? Update the document for the behavior change in PySpark Row creation. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25072-DOC Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22369 commit d257a38c647b45a9e83a2bdbbd2814f1b3fc5d56 Author: Yuanjian Li Date: 2018-09-09T04:26:23Z Update doc for SPARK-25072 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 @kiszk @maropu Great thanks for your review and advise! I'll address them and resolve the conflicts ASAP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216133261 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala --- @@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { }) } + test("test expanding glob path") { --- End diff -- ``` IIUC, the new feature is disabled as default since spark.sql.sources.parallelGetGlobbedPath.numThreads is 0. ``` Yes that's right. ``` I am afraid these test causes are executed only with disabling the new feature. ``` These mainly test the correctness of `sparkHadoopUtil.expandGlobPath`, maybe it's necessary to keep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216132926 --- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java --- @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * This is based on hadoop-common-2.7.2 + * {@link org.apache.hadoop.fs.Globber}. + * This class exposes globWithThreshold which can be used glob path in parallel. + */ +public class SparkGlobber { + public static final Log LOG = LogFactory.getLog(SparkGlobber.class.getName()); + + private final FileSystem fs; + private final FileContext fc; + private final Path pathPattern; + + public SparkGlobber(FileSystem fs, Path pathPattern) { +this.fs = fs; +this.fc = null; +this.pathPattern = pathPattern; + } + + public SparkGlobber(FileContext fc, Path pathPattern) { +this.fs = null; +this.fc = fc; +this.pathPattern = pathPattern; + } + + private FileStatus getFileStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.getFileStatus(path); + } else { +return fc.getFileStatus(path); + } +} catch (FileNotFoundException e) { + return null; +} + } + + private FileStatus[] listStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.listStatus(path); + } else { +return fc.util().listStatus(path); + } +} catch (FileNotFoundException e) { + return new FileStatus[0]; +} + } + + private Path fixRelativePart(Path path) { +if (fs != null) { + return fs.fixRelativePart(path); +} else { + return fc.fixRelativePart(path); +} + } + + /** + * Convert a path component that contains backslash ecape sequences to a + * literal string. This is necessary when you want to explicitly refer to a + * path that contains globber metacharacters. + */ + private static String unescapePathComponent(String name) { +return name.replaceAll("(.)", "$1"); + } + + /** + * Translate an absolute path into a list of path components. + * We merge double slashes into a single slash here. + * POSIX root path, i.e. '/', does not get an entry in the list. + */ + private static List getPathComponents(String path) + throws IOException { +ArrayList ret = new ArrayList(); +for (String component : path.split(Path.SEPARATOR)) { + if (!component.isEmpty()) { +ret.add(component); + } +} +return ret; + } + + private String schemeFromPath(Path path) throws IOException { +String scheme = path.toUri().getScheme(); +if (scheme == null) { + if (fs != null) { +scheme = fs.getUri().getScheme(); + } else { +scheme = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme(); + } +} +return scheme; + } + + private String authorityFromPath(Path path) throws IOException { +String authority = path.toUri().getAuthority(); +if (authority == null) { + if (fs != null) { +authority = fs.getUri().getAuthority(); + } else { +authority = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority(); + } +} +return authority ; + } + + public FileStatus[] globWithThreshold(int threshold) throws IOException { +
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216132779 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Its also can't work in Hive, I test with Hive 1.2.2 and Hadoop 2.7.3. ``` Logging initialized using configuration in jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties hive> CREATE TABLE t(a INT, b STRING, c INT) stored as parquet; OK Time taken: 1.604 seconds hive> INSERT INTO t VALUES (1, 'a', 3); Query ID = XuanYuan_20180908230549_3c8732ff-07e0-4a7a-95b4-260aed04a762 Total jobs = 3 Launching Job 1 out of 3 Number of reduce tasks is set to 0 since there's no reduce operator Job running in-process (local Hadoop) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-09-08 23:06:08,732 Stage-1 map = 0%, reduce = 0% 2018-09-08 23:06:09,743 Stage-1 map = 100%, reduce = 0% Ended Job = job_local712899233_0001 Stage-4 is selected by condition resolver. Stage-3 is filtered out by condition resolver. Stage-5 is filtered out by condition resolver. Moving data to: file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-08_23-05-49_782_100109481692677607-1/-ext-1 Loading data to table default.t Table default.t stats: [numFiles=1, numRows=1, totalSize=343, rawDataSize=3] MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK Time taken: 20.294 seconds hive> select * from t; OK 1 a 3 Time taken: 0.154 seconds, Fetched: 1 row(s) hive> ALTER TABLE t CHANGE a a STRING; OK Time taken: 0.18 seconds hive> select * from t; OK Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException: Cannot inspect org.apache.hadoop.io.IntWritable Time taken: 0.116 seconds hive> ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127932 --- Diff: python/pyspark/sql/tests.py --- @@ -547,6 +547,74 @@ def test_udf_in_filter_on_top_of_join(self): df = left.crossJoin(right).filter(f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b")) +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, b=1)]) + +def test_udf_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b"), "leftsemi") +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_and_filter_in_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf(non-deterministic) +# and normal filter(deterministic) +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) + +def test_udf_and_filter_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf(non-deterministic) +# and normal filter(deterministic) +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_and_common_filter_in_join_condition(self): --- End diff -- Add these two test for the comment in https://github.com/apache/spark/pull/22326#discussion_r216127673. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216127904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- So maybe we should also add a conf like `MetastoreConf.ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES` in hive to wrap this behavior? WDYT. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216127880 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for the checking, my mistake of not describe the intention to do this feature. We want support type change just want to add the ability of changing the metadata of column type. The scenario we meet is our user want a type change(like int is not enough, need a long type), they has done the type changing in their data file, but we should hack to change the metastore or create the whole table again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22326 @holdenk Thanks, sorry for the typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127710 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) --- End diff -- Could I try to answer this? The projection only used in a left semi join after cross join in this scenario for ensuring it only contains left side attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127673 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // only need to add cross join when whole commonJoinCondition are unevaluable + val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- Thanks, after a detailed checking, I change this to `others.nonEmpty`, this maybe an unnecessary worry about the commonJoin contains both unevaluable and evaluable condition. Also add a test in next commit to ensure this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127605 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // CheckCartesianProducts, we throw firstly here for better readable information. + throw new AnalysisException("Detected the whole commonJoinCondition:" + +s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + +"join to cross join by setting the configuration variable " + +s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") +} + } + + /** + * Generate new left and right child of join by pushing down the side only join filter, + * split commonJoinCondition based on the expression can be evaluated within join or not. + * + * @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) --- End diff -- Got it, just see the demo here https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140, remove in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216122599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for your question, actually that's also what I'm consider during do the compatible check. Hive do this column type change work in [HiveAlterHandler](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java#L175 ) and the detailed compatible check is in [ColumnType](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ColumnType.java#L206). You can see in the ColumnType checking work, it actually use the `canCast` semantic to judge compatible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215877417 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. // TODO support nondeterministic expressions in join condition. -comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, - checkAnalysis = false) +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, +checkAnalysis = false) +} + } + + test("join condition pushdown: deterministic and non-deterministic in left semi join") { --- End diff -- I didn't add SPARK-25314 cause it maybe a supplement for test("join condition pushdown: deterministic and non-deterministic"). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215876606 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => +case LeftSemi => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. --- End diff -- No problem, done in 87440b0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215876550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // [[CheckCartesianProducts]], we throw firstly here for better readable --- End diff -- Thanks, done in 87440b0. I'll also pay attention in future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215860035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for advise, I should also check the type compatible, add in ef65c4d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215859851 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1697,6 +1697,16 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is col1'") assert(getMetadata("col1").getString("key") == "value") assert(getMetadata("col1").getString("comment") == "this is col1") + +// Ensure that changing column type takes effect +sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING") +val column = catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1") +assert(column.get.dataType == StringType) + +// Ensure that changing partition column type throw exception +intercept[AnalysisException] { + sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING") +} --- End diff -- Thanks, done in ef65c4d. Also add check for type compatible check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215859764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) -// Throw an AnalysisException if the column name/dataType is changed. +// Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { throw new AnalysisException( "ALTER TABLE CHANGE COLUMN is not supported for changing column " + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + s"'${newColumn.name}' with type '${newColumn.dataType}'") --- End diff -- After add the type check, maybe we also need the type message in error message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215859681 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) -// Throw an AnalysisException if the column name/dataType is changed. +// Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { --- End diff -- Thanks, not enough yet, add type compatible check in ef65c4d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22326 Gental ping @mgaido91 @HyukjinKwon @dilipbiswal, great thanks for advice, please have a look when you have time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 Thanks @BryanCutler @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215636283 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -187,6 +191,9 @@ private[spark] class BarrierCoordinator( requesters.clear() cancelTimerTask() } + +// Check for clearing internal data, visible for test only. +private[spark] def cleanCheck(): Boolean = requesters.isEmpty && timerTask == null --- End diff -- Add internal data clean check in Xingbo's comments: https://github.com/apache/spark/pull/22165#issuecomment-415086679. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215635587 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.apache.spark._ +import org.apache.spark.rpc.RpcTimeout + +class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext { + + /** + * Get the current barrierEpoch from barrierCoordinator.states by ContextBarrierId + */ + def getCurrentBarrierEpoch( + stageId: Int, stageAttemptId: Int, barrierCoordinator: BarrierCoordinator): Int = { +val barrierId = ContextBarrierId(stageId, stageAttemptId) +barrierCoordinator.states.get(barrierId).barrierEpoch + } + + test("normal test for single task") { +sc = new SparkContext("local", "test") +val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, sc.env.rpcEnv) +val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", barrierCoordinator) +val stageId = 0 +val stageAttemptNumber = 0 +rpcEndpointRef.askSync[Unit]( + message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, taskAttemptId = 0, +barrierEpoch = 0), + timeout = new RpcTimeout(5 seconds, "rpcTimeOut")) +// sleep for waiting barrierEpoch value change +Thread.sleep(500) --- End diff -- Thanks for guidance, done in ecf12bd. I'll also pay attention in the future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215635071 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -65,7 +65,7 @@ private[spark] class BarrierCoordinator( // Record all active stage attempts that make barrier() call(s), and the corresponding internal // state. - private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + private[spark] val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] --- End diff -- No problem, done in ecf12bd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22140#discussion_r215601543 --- Diff: python/pyspark/sql/tests.py --- @@ -269,6 +269,10 @@ def test_struct_field_type_name(self): struct_field = StructField("a", IntegerType()) self.assertRaises(TypeError, struct_field.typeName) +def test_invalid_create_row(slef): +rowClass = Row("c1", "c2") --- End diff -- Thanks, done in eb3f506. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22140#discussion_r215601486 --- Diff: python/pyspark/sql/types.py --- @@ -1397,6 +1397,8 @@ def _create_row_inbound_converter(dataType): def _create_row(fields, values): +if len(values) > len(fields): +raise ValueError("Can not create %s by %s" % (fields, values)) --- End diff -- Thanks, improve done and move this check to `__call__` in `Row`. eb3f506 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22140#discussion_r215601350 --- Diff: python/pyspark/sql/tests.py --- @@ -269,6 +269,10 @@ def test_struct_field_type_name(self): struct_field = StructField("a", IntegerType()) self.assertRaises(TypeError, struct_field.typeName) +def test_invalid_create_row(slef): --- End diff -- Thanks, done in eb3f506. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22341#discussion_r215318105 --- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala --- @@ -55,7 +55,7 @@ class RDDInfo( } private[spark] object RDDInfo { - private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) + private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) --- End diff -- Thanks for explaining. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22341#discussion_r215285275 --- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala --- @@ -55,7 +55,7 @@ class RDDInfo( } private[spark] object RDDInfo { - private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) + private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) --- End diff -- Is this related to the problem? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215271726 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- ``` I am a bit surprised that works, it would be great to understand why. Thanks. ``` Sorry for the bad test, that's too special and the result just right by accident. The original implement will make all semi join return `[]` in PySpark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 gentle ping @maropu, could you help to review this? I'll keep follow up this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 gental ping @cloud-fan @gatorsmile @kiszk, we still meet this in internal folk, could you help to review? I'll resolve the conflict, great thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 gental ping @HyukjinKwon @BryanCutler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 ``` Could I do the refactor of moving ContextBarrierState out of BarrierCoordinator? ``` gental ping @jiangxb1987, I still follow up this. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215261610 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- Thanks @mgaido91 and @dilipbiswal ! I fix this in 63fbcce. The mainly problem is semi join in both deterministic and non-deterministic condition, filter after semi join will fail. Also add more tests both on python and scala side, including semi join, inner join and complex scenario described below. It makes the strategy difficult to read after considering left semi, so in 63fbcce I split the logic of semi join and inner join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214974819 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- Thanks, I'll do more test on the SemiJoin here, but as currently test over PySpark, this is not wrong, maybe I misunderstand you two `wrong` means, is your `wrong` means correctness or just benchmark regression? ![image](https://user-images.githubusercontent.com/4833765/45043269-4ba20c80-b09f-11e8-84dc-a1f3ff416303.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214969191 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Filter(others.reduceLeft(And), join) --- End diff -- Thanks, no need to add extra Filter in LeftSemi case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214968900 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + --- End diff -- Thanks, done in 82e50d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214968794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") --- End diff -- Make sense, also change this in `CheckCartesianProducts`. Done in 82e50d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214932266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214931484 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") + Cross +} else joinType --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214841211 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- ``` yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end. ``` @mgaido91 Thanks for your advise, will do the refactor in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214840994 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @dilipbiswal Thanks for your detailed check, I should write the case more typical, here the case we want to solve is UDF which accessing the attribute in both side, I'll change the case to `dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))` in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214823799 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- The log will be shown like this: ``` 16:13:35.218 WARN org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin: The whole commonJoinCondition:List((dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))) of the join plan: Join Inner, (dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14)) :- LocalRelation [a#5, b#6] +- LocalRelation [c#14, d#15] is unevaluable, it will be ignored and the join plan will be turned to cross join. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214823548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- Thanks for reminding, crossJoinEnable should be checked here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214807566 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala --- @@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } + + test("Python UDF refers to the attributes from more than one child in join condition") { --- End diff -- Got it, will add in this commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214807480 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType --- End diff -- Make sense, I'll leave a warn here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214807200 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.crossJoin(right).filter(f("a", "b")) --- End diff -- ditto, the correct test is `df = left.join(right, f("a", "b"))`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214806996 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.crossJoin(right).filter(f("a", "b")) +self.assertEqual(df.collect(), [Row(a=1, b=1)]) self.assertEqual(df.collect(), [Row(a=1, b=1)]) --- End diff -- Yep, sorry for the mess here, another commit left on. I'll fix soon, how to cancel the test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214805785 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): --- End diff -- Sorry for the mess here, I'm changing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22326 [SPARK-25314][SQL] Fix Python UDF accessing attibutes from both side of join in join conditions ## What changes were proposed in this pull request? Thanks for @bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition. ## How was this patch tested? Add regression tests in PySpark and `BatchEvalPythonExecSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25314 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22326 commit 23b10283500b18462a71eb525d4762dd33c3d4fa Author: Yuanjian Li Date: 2018-09-04T06:43:14Z Fix Python UDF accessing attibutes from both side of join in join conditions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22313: [SPARK-25306][SQL] Use cache to speed up `createF...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22313#discussion_r214528435 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala --- @@ -55,19 +59,52 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { + case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) + + private lazy val cacheExpireTimeout = + org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout + + private lazy val searchArgumentCache = CacheBuilder.newBuilder() +.expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) +.build( + new CacheLoader[FilterWithTypeMap, Option[Builder]]() { +override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { + buildSearchArgument( +typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) +} + }) + + private def getOrBuildSearchArgumentWithNewBuilder( --- End diff -- Just a little question about is any possible to reuse code with https://github.com/apache/spark/pull/22313/files#diff-224b8cbedf286ecbfdd092d1e2e2f237R61? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org