[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r204098057 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -270,7 +270,7 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - override val metadata: Map[String, String] = { + override lazy val metadata: Map[String, String] = { --- End diff -- Ouch. I'd have never thought about any code with `RDD` and physical operators on the executor-side (!) Learnt it today. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21815 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203980465 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable on executor side") { --- End diff -- `SparkPlanSuite` SGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203979619 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable on executor side") { --- End diff -- I found `SparkPlanSuite` could be another place to add to address your comment. Let me stick to `FileSourceScanExec` but please let me know if you prefer this please. I don't mind changing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203979151 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable on executor side") { --- End diff -- I think I can actually put this under `SparkPlanSuite`. Let me put this it in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203976429 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable on executor side") { --- End diff -- There's few things bothering for that actually - it's kind of messy to create `FileSourceScanExec` without `SparkSession` (and also without other utils from `SharedSQLContext`), and `QueryPlanSuite` is under `catalyst` whereas this plan itself is under `execution` in SQL core. And, I actually believe this PR more targets to make the plan canonicalizable after it's de/serialized since this plan itself is serializable and deserializable already but it's not canonicalizable after that. Let me try to clean up based on your comment anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203947912 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable on executor side") { --- End diff -- I'd like to put this test in `QueryPlanSuite`, with name `SPARK-: query plans can be serialized and deserialized`. In the test we don't need to trigger a job, just call `spark.env.serializer` to serialize and deserialize the `FileSourceScanExec` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203682853 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + try { +spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) + } catch { +case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) --- End diff -- Hm, this gives an explicit scope about which condition is a failure case though. I believe this is a rather pattern. If both are okay, let me just keep in this way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203680931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -270,7 +270,7 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - override val metadata: Map[String, String] = { + override lazy val metadata: Map[String, String] = { --- End diff -- it can be in executor side actually: ``` at org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:275) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210) at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30) at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203680278 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + try { +spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) --- End diff -- Yes, I think of course it is.. it took me a while to make a small and simple test for it.. Hope leave it out of this PR's scope though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203680097 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { -SparkSession.setActiveSession(sqlContext.sparkSession) +if (sqlContext != null) { --- End diff -- I see, thanks, I wondered because it seems a more generic issue, easier to happen, but probably we never met it as all the trials included `FileSourceScanExec` which caused an earlier failure... thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203679895 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -166,10 +166,10 @@ case class FileSourceScanExec( override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { - override val supportsBatch: Boolean = relation.fileFormat.supportBatch( + override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) - override val needsUnsafeRowConversion: Boolean = { + override lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled --- End diff -- Let's leave this out of this PR's scope. That's more like making the plan workable whereas this PR targets the plan can be canonicalized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203679141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { -SparkSession.setActiveSession(sqlContext.sparkSession) +if (sqlContext != null) { --- End diff -- Because it looks failed ahead. Once we go with `lazy` then it's discovered later (the exception message in the PR description). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203671394 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -199,7 +199,7 @@ case class FileSourceScanExec( ret } - override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { --- End diff -- it'd be computed anyway, though, when we create a new `FileSourceScanExec` in the canonicalization process, if it is not lazy, so I'd say that this is needed, as well as all the others. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203669396 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { -SparkSession.setActiveSession(sqlContext.sparkSession) +if (sqlContext != null) { --- End diff -- just curious, why wasn't the `makeCopy` problem discovered in the previous PR/investigation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203668475 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -166,10 +166,10 @@ case class FileSourceScanExec( override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { - override val supportsBatch: Boolean = relation.fileFormat.supportBatch( + override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( --- End diff -- nit: can we maybe add a comment about the reason we are making them lazy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203669375 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + try { +spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) --- End diff -- not sure whether it is feasible (maybe in a followup?), but it would be great if we can test the canonicalization of all the Exec nodes in order to prevent such issue in the future... what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203666346 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) --- End diff -- Redundant `toDF` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203665574 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -270,7 +270,7 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - override val metadata: Map[String, String] = { + override lazy val metadata: Map[String, String] = { --- End diff -- That's driver-only too, isn't it? Why is this `lazy` required? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203666893 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get --- End diff -- This `isInstanceOf` is a bit non-Scala IMHO and I'd prefer `collectFirst { case op: FileSourceScanExec => op }` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203666125 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { --- End diff -- nit: s/in/on --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203667943 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + try { +spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) + } catch { +case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) --- End diff -- It's a named test so I'd get rid of the `try-catch` block because: 1. It's going to fail the test anyway 2. The title of the test matches the `fail` message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203664621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -166,10 +166,10 @@ case class FileSourceScanExec( override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { - override val supportsBatch: Boolean = relation.fileFormat.supportBatch( + override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) - override val needsUnsafeRowConversion: Boolean = { + override lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled --- End diff -- Since you mentioned `SparkSession`, that line caught my attention where the active `SparkSession` is accessed using `SparkSession.getActiveSession.get` not `relation.sparkSession` as is the case for other places. I think that's something worth considering changing since we're at it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203665187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -199,7 +199,7 @@ case class FileSourceScanExec( ret } - override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { --- End diff -- That happens on the driver so no need for the `lazy` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/21815 [SPARK-23731][SQL] Make FileSourceScanExec canonicalizable in executor side ## What changes were proposed in this pull request? ### What's problem? In some cases, sub scalar query could throw a NPE, which is caused in execution side. ``` java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:169) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258) at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36) at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40) at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139) at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.get(HashMap.scala:70) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181) at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71) at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: