Repository: spark Updated Branches: refs/heads/master 606ae491e -> 40de176c9
[SPARK-16496][SQL] Add wholetext as option for reading text in SQL. ## What changes were proposed in this pull request? In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API. ## How was this patch tested? Added relevant new tests for both scala and Java APIs Author: Prashant Sharma <prash...@in.ibm.com> Author: Prashant Sharma <prash...@apache.org> Closes #14151 from ScrapCodes/SPARK-16496/wholetext. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40de176c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40de176c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40de176c Branch: refs/heads/master Commit: 40de176c93c5aa05bcbb1328721118b6b46ba51d Parents: 606ae49 Author: Prashant Sharma <prash...@in.ibm.com> Authored: Thu Dec 14 11:19:34 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Dec 14 11:19:34 2017 -0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 7 +- .../org/apache/spark/sql/DataFrameReader.scala | 16 ++- .../datasources/HadoopFileWholeTextReader.scala | 57 ++++++++++ .../datasources/text/TextFileFormat.scala | 31 +++++- .../datasources/text/TextOptions.scala | 7 ++ .../execution/datasources/text/TextSuite.scala | 5 +- .../datasources/text/WholeTextFileSuite.scala | 108 +++++++++++++++++++ 7 files changed, 221 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1ad974e..4e58bfb 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -304,7 +304,7 @@ class DataFrameReader(OptionUtils): @ignore_unicode_prefix @since(1.6) - def text(self, paths): + def text(self, paths, wholetext=False): """ Loads text files and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there @@ -313,11 +313,16 @@ class DataFrameReader(OptionUtils): Each line in the text file is a new row in the resulting DataFrame. :param paths: string, or list of strings, for input path(s). + :param wholetext: if true, read each file from input path(s) as a single row. >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() [Row(value=u'hello'), Row(value=u'this')] + >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True) + >>> df.collect() + [Row(value=u'hello\\nthis')] """ + self._set_opts(wholetext=wholetext) if isinstance(paths, basestring): paths = [paths] return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths))) http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ea1cf66..39fec8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -646,7 +646,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Loads text files and returns a `DataFrame` whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. * - * Each line in the text files is a new row in the resulting DataFrame. For example: + * You can set the following text-specific option(s) for reading text files: + * <ul> + * <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n". + * </li> + * </ul> + * By default, each line in the text files is a new row in the resulting DataFrame. + * + * Usage example: * {{{ * // Scala: * spark.read.text("/path/to/spark/README.md") @@ -678,7 +685,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * If the directory structure of the text files contains partitioning information, those are * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. * - * Each line in the text files is a new element in the resulting Dataset. For example: + * You can set the following textFile-specific option(s) for reading text files: + * <ul> + * <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n". + * </li> + * </ul> + * By default, each line in the text files is a new row in the resulting DataFrame. For example: * {{{ * // Scala: * spark.read.textFile("/path/to/spark/README.md") http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala new file mode 100644 index 0000000..c61a89e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.Closeable +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark.input.WholeTextFileRecordReader + +/** + * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which is all of the lines + * in that file. + */ +class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration) + extends Iterator[Text] with Closeable { + private val iterator = { + val fileSplit = new CombineFileSplit( + Array(new Path(new URI(file.filePath))), + Array(file.start), + Array(file.length), + // TODO: Implement Locality + Array.empty[String]) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0) + reader.initialize(fileSplit, hadoopAttemptContext) + new RecordReaderIterator(reader) + } + + override def hasNext: Boolean = iterator.hasNext + + override def next(): Text = iterator.next() + + override def close(): Unit = iterator.close() +} http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index d069044..8a6ab30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,12 +17,16 @@ package org.apache.spark.sql.execution.datasources.text +import java.io.Closeable + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} @@ -53,6 +57,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } } + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + val textOptions = new TextOptions(options) + super.isSplitable(sparkSession, options, path) && !textOptions.wholeText + } + override def inferSchema( sparkSession: SparkSession, options: Map[String, String], @@ -97,14 +109,25 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { assert( requiredSchema.length <= 1, "Text data source only produces a single data column named \"value\".") - + val textOptions = new TextOptions(options) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText) + } + + private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration], + requiredSchema: StructType, wholeTextMode: Boolean): + (PartitionedFile) => Iterator[UnsafeRow] = { + (file: PartitionedFile) => { - val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) + val confValue = conf.value.value + val reader = if (!wholeTextMode) { + new HadoopFileLinesReader(file, confValue) + } else { + new HadoopFileWholeTextReader(file, confValue) + } Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close())) - if (requiredSchema.isEmpty) { val emptyUnsafeRow = new UnsafeRow(0) reader.map(_ => emptyUnsafeRow) http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 49bd738..2a66156 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -33,8 +33,15 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti * Compression codec to use. */ val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName) + + /** + * wholetext - If true, read a file as a single row and not split by "\n". + */ + val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + } private[text] object TextOptions { val COMPRESSION = "compression" + val WHOLETEXT = "wholetext" } http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index cb7393c..3328704 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -185,10 +185,9 @@ class TextSuite extends QueryTest with SharedSQLContext { val data = df.collect() assert(data(0) == Row("This is a test file for the text data source")) assert(data(1) == Row("1+1")) - // non ascii characters are not allowed in the code, so we disable the scalastyle here. - // scalastyle:off + // scalastyle:off nonascii assert(data(2) == Row("æ°æ®ç 头")) - // scalastyle:on + // scalastyle:on nonascii assert(data(3) == Row("\"doh\"")) assert(data.length == 4) } http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala new file mode 100644 index 0000000..8bd736b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.text + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructType} + +class WholeTextFileSuite extends QueryTest with SharedSQLContext { + + // Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which + // can cause Filesystem.get(Configuration) to return a cached instance created with a different + // configuration than the one passed to get() (see HADOOP-8490 for more details). This caused + // hard-to-reproduce test failures, since any suites that were run after this one would inherit + // the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this, + // we disable FileSystem caching in this suite. + protected override def sparkConf = + super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true") + + private def testFile: String = { + Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString + } + + test("reading text file with option wholetext=true") { + val df = spark.read.option("wholetext", "true") + .format("text").load(testFile) + // schema + assert(df.schema == new StructType().add("value", StringType)) + + // verify content + val data = df.collect() + assert(data(0) == + Row( + // scalastyle:off nonascii + """This is a test file for the text data source + |1+1 + |æ°æ®ç 头 + |"doh" + |""".stripMargin)) + // scalastyle:on nonascii + assert(data.length == 1) + } + + test("correctness of wholetext option") { + import org.apache.spark.sql.catalyst.util._ + withTempDir { dir => + val file1 = new File(dir, "text1.txt") + stringToFile(file1, + """text file 1 contents. + |From: None to: ?? + """.stripMargin) + val file2 = new File(dir, "text2.txt") + stringToFile(file2, "text file 2 contents.") + val file3 = new File(dir, "text3.txt") + stringToFile(file3, "text file 3 contents.") + val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath) + // Since wholetext option reads each file into a single row, df.length should be no. of files. + val data = df.sort("value").collect() + assert(data.length == 3) + // Each files should represent a single Row/element in Dataframe/Dataset + assert(data(0) == Row( + """text file 1 contents. + |From: None to: ?? + """.stripMargin)) + assert(data(1) == Row( + """text file 2 contents.""".stripMargin)) + assert(data(2) == Row( + """text file 3 contents.""".stripMargin)) + } + } + + + test("Correctness of wholetext option with gzip compression mode.") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s").repartition(1) + df1.write.option("compression", "gzip").mode("overwrite").text(path) + // On reading through wholetext mode, one file will be read as a single row, i.e. not + // delimited by "next line" character. + val expected = Row(Range(0, 1000).mkString("", "\n", "\n")) + Seq(10, 100, 1000).foreach { bytes => + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) { + val df2 = spark.read.option("wholetext", "true").format("text").load(path) + val result = df2.collect().head + assert(result === expected) + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org