sadikovi commented on a change in pull request #29330:
URL: https://github.com/apache/spark/pull/29330#discussion_r747075592
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
##########
@@ -166,11 +166,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
val lazyPruningEnabled =
sparkSession.sqlContext.conf.manageFilesourcePartitions
val tablePath = new Path(relation.tableMeta.location)
val fileFormat = fileFormatClass.getConstructor().newInstance()
+ val inputFormat = relation.tableMeta.storage.inputFormat.getOrElse("")
+ val fs =
tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+
+ val symlinkTargets = if
(SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
+ SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath)
+ } else {
+ Nil
+ }
val result = if (relation.isPartitioned) {
val partitionSchema = relation.tableMeta.partitionSchema
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
- Seq(tablePath)
+ if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
+ symlinkTargets
Review comment:
Should be replaced with
`SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath)`. I don't
think we need the if statement above.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.CharStreams
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+object SymlinkTextInputFormatUtil {
Review comment:
Please add Javadoc to the class and the methods.
##########
File path: sql/hive/src/test/resources/data/files/sample1.csv
##########
@@ -0,0 +1 @@
+1,2,3
Review comment:
There should be other examples of a CSV file in the resources, can you
use those ones? The same applies to Parquet and ORC.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
##########
@@ -92,7 +92,16 @@ abstract class PartitioningAwareFileIndex(
// Directory does not exist, or has no children files
Nil
}
- PartitionDirectory(values, files)
+ // Check leaf files since they might be symlink targets
+ if (files == Nil) {
Review comment:
nit: isEmpty?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.CharStreams
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+object SymlinkTextInputFormatUtil {
+
+ def isSymlinkTextFormat(inputFormat: String): Boolean = {
+ inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat")
+ }
+
+ // Mostly copied from SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs
of Hive 3.1
Review comment:
Is it compatible with other versions of Hive?
##########
File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.{FileWriter, PrintWriter}
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class SymlinkSuite extends QueryTest with TestHiveSingleton {
+
+ test("symlink csv") {
+ withTempDir { temp =>
+ val drop = "DROP TABLE IF EXISTS symlink_csv"
+ spark.sql(drop)
+
+ val ddl =
+ s"""CREATE TABLE symlink_csv(
+ |a BIGINT,
+ |b BIGINT,
+ |c BIGINT
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ |STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin
+ spark.sql(ddl)
+
+ val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_csv.txt"
+ val prefix = System.getProperty("user.dir")
+ val testData1 =
s"file://$prefix/src/test/resources/data/files/sample1.csv\n"
+ val testData2 =
s"file://$prefix/src/test/resources/data/files/sample2.csv\n"
+ val writer = new PrintWriter(new FileWriter(testSymlinkPath, false))
+ writer.write(testData1)
+ writer.write(testData2)
+ writer.close()
+
+ val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE
symlink_csv"
+ spark.sql(load)
+
+ val dml = "SELECT * FROM symlink_csv"
+ val df = spark.sql(dml)
+
+ df.show()
+ assert(df.count() == 3)
+ }
+ }
+
+ test("symlink csv partitioned") {
+ withTempDir { temp =>
+ val drop = "DROP TABLE IF EXISTS symlink_csv_partitioned"
+ spark.sql(drop)
+
+ val ddl =
+ s"""CREATE TABLE symlink_csv_partitioned(
+ |a BIGINT,
+ |b BIGINT,
+ |c BIGINT
+ |)
+ |PARTITIONED BY (dt STRING)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ |STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin
+ spark.sql(ddl)
+
+ val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_csv1.txt"
+ val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_csv2.txt"
+ val prefix = System.getProperty("user.dir")
+ val testData1 =
s"file://$prefix/src/test/resources/data/files/sample1.csv\n"
+ val testData2 =
s"file://$prefix/src/test/resources/data/files/sample2.csv\n"
+ val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false))
+ val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false))
+ writer1.write(testData1)
+ writer2.write(testData2)
+ writer1.close()
+ writer2.close()
+
+ val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " +
+ "INTO TABLE symlink_csv_partitioned PARTITION (dt=20200726)"
+ val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " +
+ "INTO TABLE symlink_csv_partitioned PARTITION (dt=20200727)"
+ spark.sql(load1)
+ spark.sql(load2)
+
+ val dml = "SELECT * FROM symlink_csv_partitioned"
+ val df = spark.sql(dml)
+
+ df.show()
+ assert(df.count() == 3)
+ }
+ }
+
+ test("symlink orc") {
+ withTempDir { temp =>
+ val drop = "DROP TABLE IF EXISTS symlink_orc"
+ spark.sql(drop)
+
+ val ddl =
+ s"""CREATE TABLE symlink_orc(
+ |a BIGINT,
+ |b BIGINT,
+ |c BIGINT
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
+ |STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin
+ spark.sql(ddl)
+
+ val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_orc.txt"
+ val prefix = System.getProperty("user.dir")
+ val testData1 =
s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n"
+ val testData2 =
s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n"
+ val writer = new PrintWriter(new FileWriter(testSymlinkPath, false))
+ writer.write(testData1)
+ writer.write(testData2)
+ writer.close()
+
+ val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE
symlink_orc"
+ spark.sql(load)
+
+ val dml = "SELECT * FROM symlink_orc"
+ val df = spark.sql(dml)
+
+ df.show()
+ assert(df.count() == 3)
+ }
+ }
+
+ test("symlink orc partitioned") {
+ withTempDir { temp =>
+ val drop = "DROP TABLE IF EXISTS symlink_orc_partitioned"
+ spark.sql(drop)
+
+ val ddl =
+ s"""CREATE TABLE symlink_orc_partitioned(
+ |a BIGINT,
+ |b BIGINT,
+ |c BIGINT
+ |)
+ |PARTITIONED BY (dt STRING)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
+ |STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin
+ spark.sql(ddl)
+
+ val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_orc1.txt"
+ val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_orc2.txt"
+ val prefix = System.getProperty("user.dir")
Review comment:
Can you explain? Is it possible to use a temporary path here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]